diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 166 |
1 files changed, 153 insertions, 13 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e0c72d9cdc1..7637b689dba 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -33,6 +33,7 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "optimizer/planner.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" @@ -149,6 +150,7 @@ typedef struct CopyStateData Oid *typioparams; /* array of element types for in_functions */ int *defmap; /* array of default att numbers */ ExprState **defexprs; /* array of default att expressions */ + bool volatile_defexprs; /* is any of defexprs volatile? */ /* * These variables are used to reduce overhead in textual COPY FROM. @@ -277,6 +279,11 @@ static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); static uint64 CopyFrom(CopyState cstate); +static void CopyFromInsertBatch(CopyState cstate, EState *estate, + CommandId mycid, int hi_options, + ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, + BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); @@ -1842,11 +1849,17 @@ CopyFrom(CopyState cstate) ExprContext *econtext; TupleTableSlot *myslot; MemoryContext oldcontext = CurrentMemoryContext; + ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; uint64 processed = 0; + bool useHeapMultiInsert; + int nBufferedTuples = 0; +#define MAX_BUFFERED_TUPLES 1000 + HeapTuple *bufferedTuples; + Size bufferedTuplesSize = 0; Assert(cstate->rel); @@ -1941,6 +1954,28 @@ CopyFrom(CopyState cstate) /* Triggers might need a slot as well */ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * It's more efficient to prepare a bunch of tuples for insertion, and + * insert them in one heap_multi_insert() call, than call heap_insert() + * separately for every tuple. However, we can't do that if there are + * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default + * expressions. Such triggers or expressions might query the table we're + * inserting to, and act differently if the tuples that have already been + * processed and prepared for insertion are not there. + */ + if ((resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_before_row || + resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || + cstate->volatile_defexprs) + { + useHeapMultiInsert = false; + } + else + { + useHeapMultiInsert = true; + bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + } + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -1972,8 +2007,15 @@ CopyFrom(CopyState cstate) CHECK_FOR_INTERRUPTS(); - /* Reset the per-tuple exprcontext */ - ResetPerTupleExprContext(estate); + if (nBufferedTuples == 0) + { + /* + * Reset the per-tuple exprcontext. We can only do this if the + * tuple buffer is empty (calling the context the per-tuple memory + * context is a bit of a misnomer now + */ + ResetPerTupleExprContext(estate); + } /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -2010,24 +2052,49 @@ CopyFrom(CopyState cstate) if (!skip_tuple) { - List *recheckIndexes = NIL; - /* Check the constraints of the tuple */ if (cstate->rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); - /* OK, store the tuple and create index entries for it */ - heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); + if (useHeapMultiInsert) + { + /* Add this tuple to the tuple buffer */ + bufferedTuples[nBufferedTuples++] = tuple; + bufferedTuplesSize += tuple->t_len; - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate); + /* + * If the buffer filled up, flush it. Also flush if the total + * size of all the tuples in the buffer becomes large, to + * avoid using large amounts of memory for the buffers when + * the tuples are exceptionally wide. + */ + if (nBufferedTuples == MAX_BUFFERED_TUPLES || + bufferedTuplesSize > 65535) + { + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples); + nBufferedTuples = 0; + bufferedTuplesSize = 0; + } + } + else + { + List *recheckIndexes = NIL; - /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, tuple, - recheckIndexes); + /* OK, store the tuple and create index entries for it */ + heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); - list_free(recheckIndexes); + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, tuple, + recheckIndexes); + + list_free(recheckIndexes); + } /* * We count only tuples not suppressed by a BEFORE INSERT trigger; @@ -2038,6 +2105,12 @@ CopyFrom(CopyState cstate) } } + /* Flush any remaining buffered tuples */ + if (nBufferedTuples > 0) + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples); + /* Done, clean up */ error_context_stack = errcontext.previous; @@ -2071,6 +2144,67 @@ CopyFrom(CopyState cstate) } /* + * A subroutine of CopyFrom, to write the current batch of buffered heap + * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT + * triggers. + */ +static void +CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, + int hi_options, ResultRelInfo *resultRelInfo, + TupleTableSlot *myslot, BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples) +{ + MemoryContext oldcontext; + int i; + + /* + * heap_multi_insert leaks memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + heap_multi_insert(cstate->rel, + bufferedTuples, + nBufferedTuples, + mycid, + hi_options, + bistate); + MemoryContextSwitchTo(oldcontext); + + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + for (i = 0; i < nBufferedTuples; i++) + { + List *recheckIndexes; + + ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false); + recheckIndexes = + ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self), + estate); + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + recheckIndexes); + list_free(recheckIndexes); + } + } + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + resultRelInfo->ri_TrigDesc->trig_insert_after_row) + { + for (i = 0; i < nBufferedTuples; i++) + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + NIL); + } +} + +/* * Setup to read tuples from a file for COPY FROM. * * 'rel': Used as a template for the tuples @@ -2099,6 +2233,7 @@ BeginCopyFrom(Relation rel, int *defmap; ExprState **defexprs; MemoryContext oldcontext; + bool volatile_defexprs; cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); @@ -2122,6 +2257,7 @@ BeginCopyFrom(Relation rel, attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; num_defaults = 0; + volatile_defexprs = false; /* * Pick up the required catalog information for each attribute in the @@ -2163,6 +2299,9 @@ BeginCopyFrom(Relation rel, expression_planner((Expr *) defexpr), NULL); defmap[num_defaults] = attnum - 1; num_defaults++; + + if (!volatile_defexprs) + volatile_defexprs = contain_volatile_functions(defexpr); } } } @@ -2172,6 +2311,7 @@ BeginCopyFrom(Relation rel, cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; + cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; if (pipe) |