summaryrefslogtreecommitdiff
path: root/src/backend/commands/copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r--src/backend/commands/copy.c166
1 files changed, 153 insertions, 13 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index e0c72d9cdc1..7637b689dba 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
@@ -149,6 +150,7 @@ typedef struct CopyStateData
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
+ bool volatile_defexprs; /* is any of defexprs volatile? */
/*
* These variables are used to reduce overhead in textual COPY FROM.
@@ -277,6 +279,11 @@ static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
static uint64 CopyFrom(CopyState cstate);
+static void CopyFromInsertBatch(CopyState cstate, EState *estate,
+ CommandId mycid, int hi_options,
+ ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+ BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
@@ -1842,11 +1849,17 @@ CopyFrom(CopyState cstate)
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
+
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
+ bool useHeapMultiInsert;
+ int nBufferedTuples = 0;
+#define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples;
+ Size bufferedTuplesSize = 0;
Assert(cstate->rel);
@@ -1941,6 +1954,28 @@ CopyFrom(CopyState cstate)
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * processed and prepared for insertion are not there.
+ */
+ if ((resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->volatile_defexprs)
+ {
+ useHeapMultiInsert = false;
+ }
+ else
+ {
+ useHeapMultiInsert = true;
+ bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ }
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -1972,8 +2007,15 @@ CopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS();
- /* Reset the per-tuple exprcontext */
- ResetPerTupleExprContext(estate);
+ if (nBufferedTuples == 0)
+ {
+ /*
+ * Reset the per-tuple exprcontext. We can only do this if the
+ * tuple buffer is empty (calling the context the per-tuple memory
+ * context is a bit of a misnomer now
+ */
+ ResetPerTupleExprContext(estate);
+ }
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -2010,24 +2052,49 @@ CopyFrom(CopyState cstate)
if (!skip_tuple)
{
- List *recheckIndexes = NIL;
-
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
- /* OK, store the tuple and create index entries for it */
- heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+ if (useHeapMultiInsert)
+ {
+ /* Add this tuple to the tuple buffer */
+ bufferedTuples[nBufferedTuples++] = tuple;
+ bufferedTuplesSize += tuple->t_len;
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate);
+ /*
+ * If the buffer filled up, flush it. Also flush if the total
+ * size of all the tuples in the buffer becomes large, to
+ * avoid using large amounts of memory for the buffers when
+ * the tuples are exceptionally wide.
+ */
+ if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ bufferedTuplesSize > 65535)
+ {
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples);
+ nBufferedTuples = 0;
+ bufferedTuplesSize = 0;
+ }
+ }
+ else
+ {
+ List *recheckIndexes = NIL;
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes);
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
- list_free(recheckIndexes);
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes);
+
+ list_free(recheckIndexes);
+ }
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
@@ -2038,6 +2105,12 @@ CopyFrom(CopyState cstate)
}
}
+ /* Flush any remaining buffered tuples */
+ if (nBufferedTuples > 0)
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples);
+
/* Done, clean up */
error_context_stack = errcontext.previous;
@@ -2071,6 +2144,67 @@ CopyFrom(CopyState cstate)
}
/*
+ * A subroutine of CopyFrom, to write the current batch of buffered heap
+ * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+ * triggers.
+ */
+static void
+CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
+ int hi_options, ResultRelInfo *resultRelInfo,
+ TupleTableSlot *myslot, BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ /*
+ * heap_multi_insert leaks memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ heap_multi_insert(cstate->rel,
+ bufferedTuples,
+ nBufferedTuples,
+ mycid,
+ hi_options,
+ bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples,
+ * and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ List *recheckIndexes;
+
+ ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ recheckIndexes =
+ ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ estate);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ recheckIndexes);
+ list_free(recheckIndexes);
+ }
+ }
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ NIL);
+ }
+}
+
+/*
* Setup to read tuples from a file for COPY FROM.
*
* 'rel': Used as a template for the tuples
@@ -2099,6 +2233,7 @@ BeginCopyFrom(Relation rel,
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
+ bool volatile_defexprs;
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
@@ -2122,6 +2257,7 @@ BeginCopyFrom(Relation rel,
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
+ volatile_defexprs = false;
/*
* Pick up the required catalog information for each attribute in the
@@ -2163,6 +2299,9 @@ BeginCopyFrom(Relation rel,
expression_planner((Expr *) defexpr), NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
+
+ if (!volatile_defexprs)
+ volatile_defexprs = contain_volatile_functions(defexpr);
}
}
}
@@ -2172,6 +2311,7 @@ BeginCopyFrom(Relation rel,
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
+ cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
if (pipe)