Commit 091e22b2 authored by Tom Lane's avatar Tom Lane

Clean up treatment of missing default and CHECK-constraint records.

Andrew Gierth reported that it's possible to crash the backend if no
pg_attrdef record is found to match an attribute that has atthasdef set.
AttrDefaultFetch warns about this situation, but then leaves behind
a relation tupdesc that has null "adbin" pointer(s), which most places
don't guard against.

We considered promoting the warning to an error, but throwing errors
during relcache load is pretty drastic: it effectively locks one out
of using the relation at all.  What seems better is to leave the
load-time behavior as a warning, but then throw an error in any code
path that wants to use a default and can't find it.  This confines
the error to a subset of INSERT/UPDATE operations on the table, and
in particular will at least allow a pg_dump to succeed.

Also, we should fix AttrDefaultFetch to not leave any null pointers
in the tupdesc, because that just creates an untested bug hazard.

While at it, apply the same philosophy of "warn at load, throw error
only upon use of the known-missing info" to CHECK constraints.
CheckConstraintFetch is very nearly the same logic as AttrDefaultFetch,
but for reasons lost in the mists of time, it was throwing ERROR for
the same cases that AttrDefaultFetch treats as WARNING.  Make the two
functions more nearly alike.

In passing, get rid of potentially-O(N^2) loops in equalTupleDesc
by making AttrDefaultFetch sort the entries after fetching them,
so that equalTupleDesc can assume that entries in two equal tupdescs
must be in matching order.  (CheckConstraintFetch already was sorting
CHECK constraints, but equalTupleDesc hadn't been told about it.)

There's some argument for back-patching this, but with such a small
number of field reports, I'm content to fix it in HEAD.

Discussion: https://postgr.es/m/87pmzaq4gx.fsf@news-spur.riddles.org.uk
parent 9de9294b
...@@ -174,10 +174,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) ...@@ -174,10 +174,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
cpy->defval = (AttrDefault *) palloc(cpy->num_defval * sizeof(AttrDefault)); cpy->defval = (AttrDefault *) palloc(cpy->num_defval * sizeof(AttrDefault));
memcpy(cpy->defval, constr->defval, cpy->num_defval * sizeof(AttrDefault)); memcpy(cpy->defval, constr->defval, cpy->num_defval * sizeof(AttrDefault));
for (i = cpy->num_defval - 1; i >= 0; i--) for (i = cpy->num_defval - 1; i >= 0; i--)
{ cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin);
if (constr->defval[i].adbin)
cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin);
}
} }
if (constr->missing) if (constr->missing)
...@@ -203,10 +200,8 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) ...@@ -203,10 +200,8 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
memcpy(cpy->check, constr->check, cpy->num_check * sizeof(ConstrCheck)); memcpy(cpy->check, constr->check, cpy->num_check * sizeof(ConstrCheck));
for (i = cpy->num_check - 1; i >= 0; i--) for (i = cpy->num_check - 1; i >= 0; i--)
{ {
if (constr->check[i].ccname) cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
cpy->check[i].ccname = pstrdup(constr->check[i].ccname); cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
if (constr->check[i].ccbin)
cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
cpy->check[i].ccvalid = constr->check[i].ccvalid; cpy->check[i].ccvalid = constr->check[i].ccvalid;
cpy->check[i].ccnoinherit = constr->check[i].ccnoinherit; cpy->check[i].ccnoinherit = constr->check[i].ccnoinherit;
} }
...@@ -328,10 +323,7 @@ FreeTupleDesc(TupleDesc tupdesc) ...@@ -328,10 +323,7 @@ FreeTupleDesc(TupleDesc tupdesc)
AttrDefault *attrdef = tupdesc->constr->defval; AttrDefault *attrdef = tupdesc->constr->defval;
for (i = tupdesc->constr->num_defval - 1; i >= 0; i--) for (i = tupdesc->constr->num_defval - 1; i >= 0; i--)
{ pfree(attrdef[i].adbin);
if (attrdef[i].adbin)
pfree(attrdef[i].adbin);
}
pfree(attrdef); pfree(attrdef);
} }
if (tupdesc->constr->missing) if (tupdesc->constr->missing)
...@@ -352,10 +344,8 @@ FreeTupleDesc(TupleDesc tupdesc) ...@@ -352,10 +344,8 @@ FreeTupleDesc(TupleDesc tupdesc)
for (i = tupdesc->constr->num_check - 1; i >= 0; i--) for (i = tupdesc->constr->num_check - 1; i >= 0; i--)
{ {
if (check[i].ccname) pfree(check[i].ccname);
pfree(check[i].ccname); pfree(check[i].ccbin);
if (check[i].ccbin)
pfree(check[i].ccbin);
} }
pfree(check); pfree(check);
} }
...@@ -412,7 +402,6 @@ bool ...@@ -412,7 +402,6 @@ bool
equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
{ {
int i, int i,
j,
n; n;
if (tupdesc1->natts != tupdesc2->natts) if (tupdesc1->natts != tupdesc2->natts)
...@@ -488,22 +477,13 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) ...@@ -488,22 +477,13 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
n = constr1->num_defval; n = constr1->num_defval;
if (n != (int) constr2->num_defval) if (n != (int) constr2->num_defval)
return false; return false;
/* We assume here that both AttrDefault arrays are in adnum order */
for (i = 0; i < n; i++) for (i = 0; i < n; i++)
{ {
AttrDefault *defval1 = constr1->defval + i; AttrDefault *defval1 = constr1->defval + i;
AttrDefault *defval2 = constr2->defval; AttrDefault *defval2 = constr2->defval + i;
/* if (defval1->adnum != defval2->adnum)
* We can't assume that the items are always read from the system
* catalogs in the same order; so use the adnum field to identify
* the matching item to compare.
*/
for (j = 0; j < n; defval2++, j++)
{
if (defval1->adnum == defval2->adnum)
break;
}
if (j >= n)
return false; return false;
if (strcmp(defval1->adbin, defval2->adbin) != 0) if (strcmp(defval1->adbin, defval2->adbin) != 0)
return false; return false;
...@@ -534,25 +514,21 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) ...@@ -534,25 +514,21 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
n = constr1->num_check; n = constr1->num_check;
if (n != (int) constr2->num_check) if (n != (int) constr2->num_check)
return false; return false;
/*
* Similarly, we rely here on the ConstrCheck entries being sorted by
* name. If there are duplicate names, the outcome of the comparison
* is uncertain, but that should not happen.
*/
for (i = 0; i < n; i++) for (i = 0; i < n; i++)
{ {
ConstrCheck *check1 = constr1->check + i; ConstrCheck *check1 = constr1->check + i;
ConstrCheck *check2 = constr2->check; ConstrCheck *check2 = constr2->check + i;
/* if (!(strcmp(check1->ccname, check2->ccname) == 0 &&
* Similarly, don't assume that the checks are always read in the strcmp(check1->ccbin, check2->ccbin) == 0 &&
* same order; match them up by name and contents. (The name check1->ccvalid == check2->ccvalid &&
* *should* be unique, but...) check1->ccnoinherit == check2->ccnoinherit))
*/
for (j = 0; j < n; check2++, j++)
{
if (strcmp(check1->ccname, check2->ccname) == 0 &&
strcmp(check1->ccbin, check2->ccbin) == 0 &&
check1->ccvalid == check2->ccvalid &&
check1->ccnoinherit == check2->ccnoinherit)
break;
}
if (j >= n)
return false; return false;
} }
} }
......
...@@ -2501,21 +2501,24 @@ MergeAttributes(List *schema, List *supers, char relpersistence, ...@@ -2501,21 +2501,24 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
if (attribute->atthasdef) if (attribute->atthasdef)
{ {
Node *this_default = NULL; Node *this_default = NULL;
AttrDefault *attrdef;
int i;
/* Find default in constraint structure */ /* Find default in constraint structure */
Assert(constr != NULL); if (constr != NULL)
attrdef = constr->defval;
for (i = 0; i < constr->num_defval; i++)
{ {
if (attrdef[i].adnum == parent_attno) AttrDefault *attrdef = constr->defval;
for (int i = 0; i < constr->num_defval; i++)
{ {
this_default = stringToNode(attrdef[i].adbin); if (attrdef[i].adnum == parent_attno)
break; {
this_default = stringToNode(attrdef[i].adbin);
break;
}
} }
} }
Assert(this_default != NULL); if (this_default == NULL)
elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
parent_attno, RelationGetRelationName(relation));
/* /*
* If it's a GENERATED default, it might contain Vars that * If it's a GENERATED default, it might contain Vars that
......
...@@ -1609,6 +1609,15 @@ ExecRelCheck(ResultRelInfo *resultRelInfo, ...@@ -1609,6 +1609,15 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
MemoryContext oldContext; MemoryContext oldContext;
int i; int i;
/*
* CheckConstraintFetch let this pass with only a warning, but now we
* should fail rather than possibly failing to enforce an important
* constraint.
*/
if (ncheck != rel->rd_rel->relchecks)
elog(ERROR, "%d pg_constraint record(s) missing for relation \"%s\"",
rel->rd_rel->relchecks - ncheck, RelationGetRelationName(rel));
/* /*
* If first time through for this result relation, build expression * If first time through for this result relation, build expression
* nodetrees for rel's constraint expressions. Keep them in the per-query * nodetrees for rel's constraint expressions. Keep them in the per-query
...@@ -1862,7 +1871,7 @@ ExecConstraints(ResultRelInfo *resultRelInfo, ...@@ -1862,7 +1871,7 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
} }
} }
if (constr->num_check > 0) if (rel->rd_rel->relchecks > 0)
{ {
const char *failed; const char *failed;
......
...@@ -1239,8 +1239,6 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) ...@@ -1239,8 +1239,6 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
(CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) && (CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) &&
constr != NULL) constr != NULL)
{ {
AttrDefault *attrdef = constr->defval;
for (parent_attno = 1; parent_attno <= tupleDesc->natts; for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++) parent_attno++)
{ {
...@@ -1264,6 +1262,7 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) ...@@ -1264,6 +1262,7 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
(table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS))) (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
{ {
Node *this_default = NULL; Node *this_default = NULL;
AttrDefault *attrdef = constr->defval;
AlterTableCmd *atsubcmd; AlterTableCmd *atsubcmd;
bool found_whole_row; bool found_whole_row;
...@@ -1276,7 +1275,9 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) ...@@ -1276,7 +1275,9 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
break; break;
} }
} }
Assert(this_default != NULL); if (this_default == NULL)
elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
parent_attno, RelationGetRelationName(relation));
atsubcmd = makeNode(AlterTableCmd); atsubcmd = makeNode(AlterTableCmd);
atsubcmd->subtype = AT_CookedColumnDefault; atsubcmd->subtype = AT_CookedColumnDefault;
......
...@@ -1228,25 +1228,28 @@ build_column_default(Relation rel, int attrno) ...@@ -1228,25 +1228,28 @@ build_column_default(Relation rel, int attrno)
} }
/* /*
* Scan to see if relation has a default for this column. * If relation has a default for this column, fetch that expression.
*/ */
if (att_tup->atthasdef && rd_att->constr && if (att_tup->atthasdef)
rd_att->constr->num_defval > 0)
{ {
AttrDefault *defval = rd_att->constr->defval; if (rd_att->constr && rd_att->constr->num_defval > 0)
int ndef = rd_att->constr->num_defval;
while (--ndef >= 0)
{ {
if (attrno == defval[ndef].adnum) AttrDefault *defval = rd_att->constr->defval;
int ndef = rd_att->constr->num_defval;
while (--ndef >= 0)
{ {
/* if (attrno == defval[ndef].adnum)
* Found it, convert string representation to node tree. {
*/ /* Found it, convert string representation to node tree. */
expr = stringToNode(defval[ndef].adbin); expr = stringToNode(defval[ndef].adbin);
break; break;
}
} }
} }
if (expr == NULL)
elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
attrno, RelationGetRelationName(rel));
} }
/* /*
......
...@@ -282,7 +282,8 @@ static void RelationInitPhysicalAddr(Relation relation); ...@@ -282,7 +282,8 @@ static void RelationInitPhysicalAddr(Relation relation);
static void load_critical_index(Oid indexoid, Oid heapoid); static void load_critical_index(Oid indexoid, Oid heapoid);
static TupleDesc GetPgClassDescriptor(void); static TupleDesc GetPgClassDescriptor(void);
static TupleDesc GetPgIndexDescriptor(void); static TupleDesc GetPgIndexDescriptor(void);
static void AttrDefaultFetch(Relation relation); static void AttrDefaultFetch(Relation relation, int ndef);
static int AttrDefaultCmp(const void *a, const void *b);
static void CheckConstraintFetch(Relation relation); static void CheckConstraintFetch(Relation relation);
static int CheckConstraintCmp(const void *a, const void *b); static int CheckConstraintCmp(const void *a, const void *b);
static void InitIndexAmRoutine(Relation relation); static void InitIndexAmRoutine(Relation relation);
...@@ -503,7 +504,6 @@ RelationBuildTupleDesc(Relation relation) ...@@ -503,7 +504,6 @@ RelationBuildTupleDesc(Relation relation)
ScanKeyData skey[2]; ScanKeyData skey[2];
int need; int need;
TupleConstr *constr; TupleConstr *constr;
AttrDefault *attrdef = NULL;
AttrMissing *attrmiss = NULL; AttrMissing *attrmiss = NULL;
int ndef = 0; int ndef = 0;
...@@ -512,8 +512,8 @@ RelationBuildTupleDesc(Relation relation) ...@@ -512,8 +512,8 @@ RelationBuildTupleDesc(Relation relation)
relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID; relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
relation->rd_att->tdtypmod = -1; /* just to be sure */ relation->rd_att->tdtypmod = -1; /* just to be sure */
constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext, constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
sizeof(TupleConstr)); sizeof(TupleConstr));
constr->has_not_null = false; constr->has_not_null = false;
constr->has_generated_stored = false; constr->has_generated_stored = false;
...@@ -557,10 +557,9 @@ RelationBuildTupleDesc(Relation relation) ...@@ -557,10 +557,9 @@ RelationBuildTupleDesc(Relation relation)
attnum = attp->attnum; attnum = attp->attnum;
if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation)) if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
elog(ERROR, "invalid attribute number %d for %s", elog(ERROR, "invalid attribute number %d for relation \"%s\"",
attp->attnum, RelationGetRelationName(relation)); attp->attnum, RelationGetRelationName(relation));
memcpy(TupleDescAttr(relation->rd_att, attnum - 1), memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
attp, attp,
ATTRIBUTE_FIXED_PART_SIZE); ATTRIBUTE_FIXED_PART_SIZE);
...@@ -570,22 +569,10 @@ RelationBuildTupleDesc(Relation relation) ...@@ -570,22 +569,10 @@ RelationBuildTupleDesc(Relation relation)
constr->has_not_null = true; constr->has_not_null = true;
if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED) if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
constr->has_generated_stored = true; constr->has_generated_stored = true;
/* If the column has a default, fill it into the attrdef array */
if (attp->atthasdef) if (attp->atthasdef)
{
if (attrdef == NULL)
attrdef = (AttrDefault *)
MemoryContextAllocZero(CacheMemoryContext,
RelationGetNumberOfAttributes(relation) *
sizeof(AttrDefault));
attrdef[ndef].adnum = attnum;
attrdef[ndef].adbin = NULL;
ndef++; ndef++;
}
/* Likewise for a missing value */ /* If the column has a "missing" value, put it in the attrmiss array */
if (attp->atthasmissing) if (attp->atthasmissing)
{ {
Datum missingval; Datum missingval;
...@@ -648,7 +635,7 @@ RelationBuildTupleDesc(Relation relation) ...@@ -648,7 +635,7 @@ RelationBuildTupleDesc(Relation relation)
table_close(pg_attribute_desc, AccessShareLock); table_close(pg_attribute_desc, AccessShareLock);
if (need != 0) if (need != 0)
elog(ERROR, "catalog is missing %d attribute(s) for relid %u", elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
need, RelationGetRelid(relation)); need, RelationGetRelid(relation));
/* /*
...@@ -680,33 +667,19 @@ RelationBuildTupleDesc(Relation relation) ...@@ -680,33 +667,19 @@ RelationBuildTupleDesc(Relation relation)
constr->has_generated_stored || constr->has_generated_stored ||
ndef > 0 || ndef > 0 ||
attrmiss || attrmiss ||
relation->rd_rel->relchecks) relation->rd_rel->relchecks > 0)
{ {
relation->rd_att->constr = constr; relation->rd_att->constr = constr;
if (ndef > 0) /* DEFAULTs */ if (ndef > 0) /* DEFAULTs */
{ AttrDefaultFetch(relation, ndef);
if (ndef < RelationGetNumberOfAttributes(relation))
constr->defval = (AttrDefault *)
repalloc(attrdef, ndef * sizeof(AttrDefault));
else
constr->defval = attrdef;
constr->num_defval = ndef;
AttrDefaultFetch(relation);
}
else else
constr->num_defval = 0; constr->num_defval = 0;
constr->missing = attrmiss; constr->missing = attrmiss;
if (relation->rd_rel->relchecks > 0) /* CHECKs */ if (relation->rd_rel->relchecks > 0) /* CHECKs */
{
constr->num_check = relation->rd_rel->relchecks;
constr->check = (ConstrCheck *)
MemoryContextAllocZero(CacheMemoryContext,
constr->num_check * sizeof(ConstrCheck));
CheckConstraintFetch(relation); CheckConstraintFetch(relation);
}
else else
constr->num_check = 0; constr->num_check = 0;
} }
...@@ -4251,21 +4224,29 @@ GetPgIndexDescriptor(void) ...@@ -4251,21 +4224,29 @@ GetPgIndexDescriptor(void)
/* /*
* Load any default attribute value definitions for the relation. * Load any default attribute value definitions for the relation.
*
* ndef is the number of attributes that were marked atthasdef.
*
* Note: we don't make it a hard error to be missing some pg_attrdef records.
* We can limp along as long as nothing needs to use the default value. Code
* that fails to find an expected AttrDefault record should throw an error.
*/ */
static void static void
AttrDefaultFetch(Relation relation) AttrDefaultFetch(Relation relation, int ndef)
{ {
AttrDefault *attrdef = relation->rd_att->constr->defval; AttrDefault *attrdef;
int ndef = relation->rd_att->constr->num_defval;
Relation adrel; Relation adrel;
SysScanDesc adscan; SysScanDesc adscan;
ScanKeyData skey; ScanKeyData skey;
HeapTuple htup; HeapTuple htup;
Datum val; int found = 0;
bool isnull;
int found; /* Allocate array with room for as many entries as expected */
int i; attrdef = (AttrDefault *)
MemoryContextAllocZero(CacheMemoryContext,
ndef * sizeof(AttrDefault));
/* Search pg_attrdef for relevant entries */
ScanKeyInit(&skey, ScanKeyInit(&skey,
Anum_pg_attrdef_adrelid, Anum_pg_attrdef_adrelid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
...@@ -4274,65 +4255,94 @@ AttrDefaultFetch(Relation relation) ...@@ -4274,65 +4255,94 @@ AttrDefaultFetch(Relation relation)
adrel = table_open(AttrDefaultRelationId, AccessShareLock); adrel = table_open(AttrDefaultRelationId, AccessShareLock);
adscan = systable_beginscan(adrel, AttrDefaultIndexId, true, adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
NULL, 1, &skey); NULL, 1, &skey);
found = 0;
while (HeapTupleIsValid(htup = systable_getnext(adscan))) while (HeapTupleIsValid(htup = systable_getnext(adscan)))
{ {
Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup); Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1); Datum val;
bool isnull;
for (i = 0; i < ndef; i++) /* protect limited size of array */
if (found >= ndef)
{ {
if (adform->adnum != attrdef[i].adnum) elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
continue; adform->adnum, RelationGetRelationName(relation));
if (attrdef[i].adbin != NULL)
elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
NameStr(attr->attname),
RelationGetRelationName(relation));
else
found++;
val = fastgetattr(htup,
Anum_pg_attrdef_adbin,
adrel->rd_att, &isnull);
if (isnull)
elog(WARNING, "null adbin for attr %s of rel %s",
NameStr(attr->attname),
RelationGetRelationName(relation));
else
{
/* detoast and convert to cstring in caller's context */
char *s = TextDatumGetCString(val);
attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
pfree(s);
}
break; break;
} }
if (i >= ndef) val = fastgetattr(htup,
elog(WARNING, "unexpected attrdef record found for attr %d of rel %s", Anum_pg_attrdef_adbin,
adrel->rd_att, &isnull);
if (isnull)
elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
adform->adnum, RelationGetRelationName(relation)); adform->adnum, RelationGetRelationName(relation));
else
{
/* detoast and convert to cstring in caller's context */
char *s = TextDatumGetCString(val);
attrdef[found].adnum = adform->adnum;
attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
pfree(s);
found++;
}
} }
systable_endscan(adscan); systable_endscan(adscan);
table_close(adrel, AccessShareLock); table_close(adrel, AccessShareLock);
if (found != ndef)
elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
ndef - found, RelationGetRelationName(relation));
/*
* Sort the AttrDefault entries by adnum, for the convenience of
* equalTupleDescs(). (Usually, they already will be in order, but this
* might not be so if systable_getnext isn't using an index.)
*/
if (found > 1)
qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);
/* Install array only after it's fully valid */
relation->rd_att->constr->defval = attrdef;
relation->rd_att->constr->num_defval = found;
}
/*
* qsort comparator to sort AttrDefault entries by adnum
*/
static int
AttrDefaultCmp(const void *a, const void *b)
{
const AttrDefault *ada = (const AttrDefault *) a;
const AttrDefault *adb = (const AttrDefault *) b;
return ada->adnum - adb->adnum;
} }
/* /*
* Load any check constraints for the relation. * Load any check constraints for the relation.
*
* As with defaults, if we don't find the expected number of them, just warn
* here. The executor should throw an error if an INSERT/UPDATE is attempted.
*/ */
static void static void
CheckConstraintFetch(Relation relation) CheckConstraintFetch(Relation relation)
{ {
ConstrCheck *check = relation->rd_att->constr->check; ConstrCheck *check;
int ncheck = relation->rd_att->constr->num_check; int ncheck = relation->rd_rel->relchecks;
Relation conrel; Relation conrel;
SysScanDesc conscan; SysScanDesc conscan;
ScanKeyData skey[1]; ScanKeyData skey[1];
HeapTuple htup; HeapTuple htup;
int found = 0; int found = 0;
/* Allocate array with room for as many entries as expected */
check = (ConstrCheck *)
MemoryContextAllocZero(CacheMemoryContext,
ncheck * sizeof(ConstrCheck));
/* Search pg_constraint for relevant entries */
ScanKeyInit(&skey[0], ScanKeyInit(&skey[0],
Anum_pg_constraint_conrelid, Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
...@@ -4347,15 +4357,18 @@ CheckConstraintFetch(Relation relation) ...@@ -4347,15 +4357,18 @@ CheckConstraintFetch(Relation relation)
Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup); Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
Datum val; Datum val;
bool isnull; bool isnull;
char *s;
/* We want check constraints only */ /* We want check constraints only */
if (conform->contype != CONSTRAINT_CHECK) if (conform->contype != CONSTRAINT_CHECK)
continue; continue;
/* protect limited size of array */
if (found >= ncheck) if (found >= ncheck)
elog(ERROR, "unexpected constraint record found for rel %s", {
elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
RelationGetRelationName(relation)); RelationGetRelationName(relation));
break;
}
check[found].ccvalid = conform->convalidated; check[found].ccvalid = conform->convalidated;
check[found].ccnoinherit = conform->connoinherit; check[found].ccnoinherit = conform->connoinherit;
...@@ -4367,27 +4380,36 @@ CheckConstraintFetch(Relation relation) ...@@ -4367,27 +4380,36 @@ CheckConstraintFetch(Relation relation)
Anum_pg_constraint_conbin, Anum_pg_constraint_conbin,
conrel->rd_att, &isnull); conrel->rd_att, &isnull);
if (isnull) if (isnull)
elog(ERROR, "null conbin for rel %s", elog(WARNING, "null conbin for relation \"%s\"",
RelationGetRelationName(relation)); RelationGetRelationName(relation));
else
{
/* detoast and convert to cstring in caller's context */
char *s = TextDatumGetCString(val);
/* detoast and convert to cstring in caller's context */ check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
s = TextDatumGetCString(val); pfree(s);
check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s); found++;
pfree(s); }
found++;
} }
systable_endscan(conscan); systable_endscan(conscan);
table_close(conrel, AccessShareLock); table_close(conrel, AccessShareLock);
if (found != ncheck) if (found != ncheck)
elog(ERROR, "%d constraint record(s) missing for rel %s", elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
ncheck - found, RelationGetRelationName(relation)); ncheck - found, RelationGetRelationName(relation));
/* Sort the records so that CHECKs are applied in a deterministic order */ /*
if (ncheck > 1) * Sort the records by name. This ensures that CHECKs are applied in a
qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp); * deterministic order, and it also makes equalTupleDescs() faster.
*/
if (found > 1)
qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);
/* Install array only after it's fully valid */
relation->rd_att->constr->check = check;
relation->rd_att->constr->num_check = found;
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment