diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 218 |
1 files changed, 176 insertions, 42 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 7410bff04b1..438126a3e18 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.158 2002/06/20 20:29:27 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.159 2002/07/18 04:43:50 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -28,6 +28,7 @@ #include "commands/copy.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "rewrite/rewriteHandler.h" #include "libpq/libpq.h" #include "miscadmin.h" #include "tcop/pquery.h" @@ -46,13 +47,14 @@ /* non-export function prototypes */ -static void CopyTo(Relation rel, bool binary, bool oids, FILE *fp, char *delim, char *null_print); -static void CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, char *delim, char *null_print); +static void CopyTo(Relation rel, List *attlist, bool binary, bool oids, FILE *fp, char *delim, char *null_print); +static void CopyFrom(Relation rel, List *attlist, bool binary, bool oids, FILE *fp, char *delim, char *null_print); static Oid GetInputFunction(Oid type); static Oid GetTypeElement(Oid type); static void CopyReadNewline(FILE *fp, int *newline); static char *CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_print); static void CopyAttributeOut(FILE *fp, char *string, char *delim); +static void CopyAssertAttlist(Relation rel, List* attlist, bool from); static const char BinarySignature[12] = "PGBCOPY\n\377\r\n\0"; @@ -267,7 +269,8 @@ DoCopy(const CopyStmt *stmt) char *filename = stmt->filename; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); - List *option; + List *option; + List *attlist = stmt->attlist; DefElem *dbinary = NULL; DefElem *doids = NULL; DefElem *ddelim = NULL; @@ -289,25 +292,27 @@ DoCopy(const CopyStmt *stmt) if (strcmp(defel->defname, "binary") == 0) { if (dbinary) - elog(ERROR, "COPY: conflicting options"); + /* should this really be an error? */ + elog(ERROR, "COPY: BINARY option appears more than once"); dbinary = defel; } else if (strcmp(defel->defname, "oids") == 0) { if (doids) - elog(ERROR, "COPY: conflicting options"); + /* should this really be an error? */ + elog(ERROR, "COPY: OIDS option appears more than once"); doids = defel; } else if (strcmp(defel->defname, "delimiter") == 0) { if (ddelim) - elog(ERROR, "COPY: conflicting options"); + elog(ERROR, "COPY: DELIMITER string may only be defined once in query"); ddelim = defel; } else if (strcmp(defel->defname, "null") == 0) { if (dnull) - elog(ERROR, "COPY: conflicting options"); + elog(ERROR, "COPY: NULL representation may only be defined once in query"); dnull = defel; } else @@ -367,6 +372,24 @@ DoCopy(const CopyStmt *stmt) server_encoding = GetDatabaseEncoding(); #endif + if( attlist == NIL ){ + /* get list of attributes in the relation */ + TupleDesc desc = RelationGetDescr(rel); + int i; + for(i = 0; i < desc->natts; ++i){ + Ident* id = makeNode(Ident); + id->name = NameStr(desc->attrs[i]->attname); + attlist = lappend(attlist,id); + } + } + else{ + if( binary ){ + elog(ERROR,"COPY: BINARY format cannot be used with specific column list"); + } + /* verify that any user-specified attributes exist in the relation */ + CopyAssertAttlist(rel,attlist,is_from); + } + if (is_from) { /* copy from file to database */ if (rel->rd_rel->relkind != RELKIND_RELATION) @@ -410,7 +433,7 @@ DoCopy(const CopyStmt *stmt) elog(ERROR, "COPY: %s is a directory.", filename); } } - CopyFrom(rel, binary, oids, fp, delim, null_print); + CopyFrom(rel, attlist, binary, oids, fp, delim, null_print); } else { /* copy from database to file */ @@ -466,7 +489,7 @@ DoCopy(const CopyStmt *stmt) elog(ERROR, "COPY: %s is a directory.", filename); } } - CopyTo(rel, binary, oids, fp, delim, null_print); + CopyTo(rel, attlist, binary, oids, fp, delim, null_print); } if (!pipe) @@ -494,8 +517,8 @@ DoCopy(const CopyStmt *stmt) * Copy from relation TO file. */ static void -CopyTo(Relation rel, bool binary, bool oids, FILE *fp, - char *delim, char *null_print) +CopyTo(Relation rel, List *attlist, bool binary, bool oids, + FILE *fp, char *delim, char *null_print) { HeapTuple tuple; TupleDesc tupDesc; @@ -509,6 +532,10 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, int16 fld_size; char *string; Snapshot mySnapshot; + int copy_attr_count; + int* attmap; + int p = 0; + List* cur; if (oids && !rel->rd_rel->relhasoids) elog(ERROR, "COPY: table %s does not have OIDs", @@ -517,6 +544,18 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, tupDesc = rel->rd_att; attr_count = rel->rd_att->natts; attr = rel->rd_att->attrs; + copy_attr_count = length(attlist); + { + attmap = (int*)palloc(copy_attr_count * sizeof(int)); + foreach(cur,attlist){ + for (i = 0; i < attr_count; i++){ + if( strcmp(strVal(lfirst(cur)),NameStr(attr[i]->attname)) == 0){ + attmap[p++] = i; + continue; + } + } + } + } /* * For binary copy we really only need isvarlena, but compute it @@ -593,13 +632,14 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, } } - for (i = 0; i < attr_count; i++) + for (i = 0; i < copy_attr_count; i++) { Datum origvalue, value; bool isnull; + int mi = attmap[i]; - origvalue = heap_getattr(tuple, i + 1, tupDesc, &isnull); + origvalue = heap_getattr(tuple, mi + 1, tupDesc, &isnull); if (!binary) { @@ -628,25 +668,25 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, * (or for binary case, becase we must output untoasted * value). */ - if (isvarlena[i]) + if (isvarlena[mi]) value = PointerGetDatum(PG_DETOAST_DATUM(origvalue)); else value = origvalue; if (!binary) { - string = DatumGetCString(FunctionCall3(&out_functions[i], + string = DatumGetCString(FunctionCall3(&out_functions[mi], value, - ObjectIdGetDatum(elements[i]), - Int32GetDatum(attr[i]->atttypmod))); + ObjectIdGetDatum(elements[mi]), + Int32GetDatum(attr[mi]->atttypmod))); CopyAttributeOut(fp, string, delim); pfree(string); } else { - fld_size = attr[i]->attlen; + fld_size = attr[mi]->attlen; CopySendData(&fld_size, sizeof(int16), fp); - if (isvarlena[i]) + if (isvarlena[mi]) { /* varlena */ Assert(fld_size == -1); @@ -654,7 +694,7 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, VARSIZE(value), fp); } - else if (!attr[i]->attbyval) + else if (!attr[mi]->attbyval) { /* fixed-length pass-by-reference */ Assert(fld_size > 0); @@ -709,13 +749,13 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp, * Copy FROM file to relation. */ static void -CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, - char *delim, char *null_print) +CopyFrom(Relation rel, List *attlist, bool binary, bool oids, + FILE *fp, char *delim, char *null_print) { HeapTuple tuple; TupleDesc tupDesc; Form_pg_attribute *attr; - AttrNumber attr_count; + AttrNumber attr_count, copy_attr_count, def_attr_count; FmgrInfo *in_functions; Oid *elements; int i; @@ -732,10 +772,17 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, Oid loaded_oid = InvalidOid; bool skip_tuple = false; bool file_has_oids; + int* attmap = NULL; + int* defmap = NULL; + Node** defexprs = NULL; /* array of default att expressions */ + ExprContext *econtext; /* used for ExecEvalExpr for default atts */ + ExprDoneCond isdone; tupDesc = RelationGetDescr(rel); attr = tupDesc->attrs; attr_count = tupDesc->natts; + copy_attr_count = length(attlist); + def_attr_count = 0; /* * We need a ResultRelInfo so we can use the regular executor's @@ -758,15 +805,42 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, slot = ExecAllocTableSlot(tupleTable); ExecSetSlotDescriptor(slot, tupDesc, false); + if (!binary) { + /* + * pick up the input function and default expression (if any) for + * each attribute in the relation. + */ + List* cur; + attmap = (int*)palloc(sizeof(int) * attr_count); + defmap = (int*)palloc(sizeof(int) * attr_count); + defexprs = (Node**)palloc(sizeof(Node*) * attr_count); in_functions = (FmgrInfo *) palloc(attr_count * sizeof(FmgrInfo)); elements = (Oid *) palloc(attr_count * sizeof(Oid)); for (i = 0; i < attr_count; i++) { + int p = 0; + bool specified = false; in_func_oid = (Oid) GetInputFunction(attr[i]->atttypid); fmgr_info(in_func_oid, &in_functions[i]); elements[i] = GetTypeElement(attr[i]->atttypid); + foreach(cur,attlist){ + if( strcmp(strVal(lfirst(cur)),NameStr(attr[i]->attname)) == 0){ + attmap[p] = i; + specified = true; + continue; + } + ++p; + } + if( ! specified ){ + /* column not specified, try to get a default */ + defexprs[def_attr_count] = build_column_default(rel,i+1); + if( defexprs[def_attr_count] != NULL ){ + defmap[def_attr_count] = i; + ++def_attr_count; + } + } } file_has_oids = oids; /* must rely on user to tell us this... */ } @@ -821,12 +895,14 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, copy_lineno = 0; fe_eof = false; + econtext = GetPerTupleExprContext(estate); + while (!done) { CHECK_FOR_INTERRUPTS(); copy_lineno++; - + /* Reset the per-output-tuple exprcontext */ ResetPerTupleExprContext(estate); @@ -854,26 +930,42 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, elog(ERROR, "COPY TEXT: Invalid Oid"); } } - - for (i = 0; i < attr_count && !done; i++) + + /* + * here, we only try to read as many attributes as + * were specified. + */ + for (i = 0; i < copy_attr_count && !done; i++) { + int m = attmap[i]; string = CopyReadAttribute(fp, &isnull, delim, &newline, null_print); - if (isnull) - { - /* already set values[i] and nulls[i] */ + + if( isnull ){ + /* nothing */ } else if (string == NULL) done = 1; /* end of file */ else { - values[i] = FunctionCall3(&in_functions[i], - CStringGetDatum(string), - ObjectIdGetDatum(elements[i]), - Int32GetDatum(attr[i]->atttypmod)); - nulls[i] = ' '; + values[m] = FunctionCall3(&in_functions[m], + CStringGetDatum(string), + ObjectIdGetDatum(elements[m]), + Int32GetDatum(attr[m]->atttypmod)); + nulls[m] = ' '; } } + + /* + * as above, we only try a default lookup if one is + * known to be available + */ + for (i = 0; i < def_attr_count && !done; i++){ + bool isnull; + values[defmap[i]] = ExecEvalExpr(defexprs[i],econtext,&isnull,&isdone); + if( ! isnull ) + nulls[defmap[i]] = ' '; + } if (!done) CopyReadNewline(fp, &newline); } @@ -975,7 +1067,7 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, break; tuple = heap_formtuple(tupDesc, values, nulls); - + if (oids && file_has_oids) tuple->t_data->t_oid = loaded_oid; @@ -1021,12 +1113,6 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, ExecARInsertTriggers(estate, resultRelInfo, tuple); } - for (i = 0; i < attr_count; i++) - { - if (!attr[i]->attbyval && nulls[i] != 'n') - pfree(DatumGetPointer(values[i])); - } - heap_freetuple(tuple); } @@ -1361,3 +1447,51 @@ CopyAttributeOut(FILE *fp, char *server_string, char *delim) pfree(string_start); /* pfree pg_server_to_client result */ #endif } + +/* + * CopyAssertAttlist: elog(ERROR,...) if the specified attlist + * is not valid for the Relation + */ +static void +CopyAssertAttlist(Relation rel, List* attlist, bool from) +{ + TupleDesc tupDesc; + List* cur; + char* illegalattname = NULL; + int attr_count; + const char* to_or_from; + + if( attlist == NIL ) + return; + + to_or_from = (from == true ? "FROM" : "TO"); + + tupDesc = RelationGetDescr(rel); + Assert(tupDesc != NULL); + + /* + * make sure there aren't more columns specified than are in the table + */ + attr_count = tupDesc->natts; + if( attr_count < length(attlist) ) + elog(ERROR,"More columns specified in COPY %s command than in target relation",to_or_from); + + /* + * make sure no columns are specified that don't exist in the table + */ + foreach(cur,attlist) + { + int found = 0; + int i = 0; + for(;i<attr_count;++i) + { + if( strcmp(strVal(lfirst(cur)),NameStr(tupDesc->attrs[i]->attname)) == 0) + ++found; + } + if( ! found ) + illegalattname = strVal(lfirst(cur)); + } + if( illegalattname ) + elog(ERROR,"Attribute referenced in COPY %s command does not exist: \"%s.%s\"",to_or_from,RelationGetRelationName(rel),illegalattname); +} + |