summaryrefslogtreecommitdiff
path: root/contrib/file_fdw/file_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/file_fdw/file_fdw.c')
-rw-r--r--contrib/file_fdw/file_fdw.c248
1 files changed, 225 insertions, 23 deletions
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index e8907709bd9..30ed9fbad14 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -20,6 +20,7 @@
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/explain.h"
+#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
@@ -28,6 +29,7 @@
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
+#include "utils/memutils.h"
#include "utils/rel.h"
PG_MODULE_MAGIC;
@@ -123,6 +125,7 @@ static void fileBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
static void fileReScanForeignScan(ForeignScanState *node);
static void fileEndForeignScan(ForeignScanState *node);
+static AcquireSampleRowsFunc fileAnalyzeForeignTable(Relation relation);
/*
* Helper functions
@@ -136,6 +139,10 @@ static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
FileFdwPlanState *fdw_private,
Cost *startup_cost, Cost *total_cost);
+static int file_acquire_sample_rows(Relation onerel, int elevel,
+ HeapTuple *rows, int targrows,
+ double *totalrows, double *totaldeadrows,
+ BlockNumber *totalpages);
/*
@@ -155,6 +162,7 @@ file_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->IterateForeignScan = fileIterateForeignScan;
fdwroutine->ReScanForeignScan = fileReScanForeignScan;
fdwroutine->EndForeignScan = fileEndForeignScan;
+ fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
PG_RETURN_POINTER(fdwroutine);
}
@@ -614,6 +622,23 @@ fileIterateForeignScan(ForeignScanState *node)
}
/*
+ * fileReScanForeignScan
+ * Rescan table, possibly with new parameters
+ */
+static void
+fileReScanForeignScan(ForeignScanState *node)
+{
+ FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+
+ EndCopyFrom(festate->cstate);
+
+ festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
+ festate->filename,
+ NIL,
+ festate->options);
+}
+
+/*
* fileEndForeignScan
* Finish scanning foreign table and dispose objects used for this scan
*/
@@ -628,20 +653,13 @@ fileEndForeignScan(ForeignScanState *node)
}
/*
- * fileReScanForeignScan
- * Rescan table, possibly with new parameters
+ * fileAnalyzeForeignTable
+ * Test whether analyzing this foreign table is supported
*/
-static void
-fileReScanForeignScan(ForeignScanState *node)
+static AcquireSampleRowsFunc
+fileAnalyzeForeignTable(Relation relation)
{
- FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
-
- EndCopyFrom(festate->cstate);
-
- festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
- festate->filename,
- NIL,
- festate->options);
+ return file_acquire_sample_rows;
}
/*
@@ -657,7 +675,6 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel,
{
struct stat stat_buf;
BlockNumber pages;
- int tuple_width;
double ntuples;
double nrows;
@@ -674,26 +691,45 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel,
pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
if (pages < 1)
pages = 1;
-
fdw_private->pages = pages;
/*
- * Estimate the number of tuples in the file. We back into this estimate
- * using the planner's idea of the relation width; which is bogus if not
- * all columns are being read, not to mention that the text representation
- * of a row probably isn't the same size as its internal representation.
- * FIXME later.
+ * Estimate the number of tuples in the file.
*/
- tuple_width = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData));
+ if (baserel->pages > 0)
+ {
+ /*
+ * We have # of pages and # of tuples from pg_class (that is, from a
+ * previous ANALYZE), so compute a tuples-per-page estimate and scale
+ * that by the current file size.
+ */
+ double density;
- ntuples = clamp_row_est((double) stat_buf.st_size / (double) tuple_width);
+ density = baserel->tuples / (double) baserel->pages;
+ ntuples = clamp_row_est(density * (double) pages);
+ }
+ else
+ {
+ /*
+ * Otherwise we have to fake it. We back into this estimate using the
+ * planner's idea of the relation width; which is bogus if not all
+ * columns are being read, not to mention that the text representation
+ * of a row probably isn't the same size as its internal
+ * representation. Possibly we could do something better, but the
+ * real answer to anyone who complains is "ANALYZE" ...
+ */
+ int tuple_width;
+ tuple_width = MAXALIGN(baserel->width) +
+ MAXALIGN(sizeof(HeapTupleHeaderData));
+ ntuples = clamp_row_est((double) stat_buf.st_size /
+ (double) tuple_width);
+ }
fdw_private->ntuples = ntuples;
/*
* Now estimate the number of rows returned by the scan after applying the
- * baserestrictinfo quals. This is pretty bogus too, since the planner
- * will have no stats about the relation, but it's better than nothing.
+ * baserestrictinfo quals.
*/
nrows = ntuples *
clauselist_selectivity(root,
@@ -736,3 +772,169 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
run_cost += cpu_per_tuple * ntuples;
*total_cost = *startup_cost + run_cost;
}
+
+/*
+ * file_acquire_sample_rows -- acquire a random sample of rows from the table
+ *
+ * Selected rows are returned in the caller-allocated array rows[],
+ * which must have at least targrows entries.
+ * The actual number of rows selected is returned as the function result.
+ * We also count the total number of rows in the file and return it into
+ * *totalrows, and return the file's physical size in *totalpages.
+ * Note that *totaldeadrows is always set to 0.
+ *
+ * Note that the returned list of rows is not always in order by physical
+ * position in the file. Therefore, correlation estimates derived later
+ * may be meaningless, but it's OK because we don't use the estimates
+ * currently (the planner only pays attention to correlation for indexscans).
+ */
+static int
+file_acquire_sample_rows(Relation onerel, int elevel,
+ HeapTuple *rows, int targrows,
+ double *totalrows, double *totaldeadrows,
+ BlockNumber *totalpages)
+{
+ int numrows = 0;
+ double rowstoskip = -1; /* -1 means not set yet */
+ double rstate;
+ TupleDesc tupDesc;
+ Datum *values;
+ bool *nulls;
+ bool found;
+ char *filename;
+ struct stat stat_buf;
+ List *options;
+ CopyState cstate;
+ ErrorContextCallback errcontext;
+ MemoryContext oldcontext = CurrentMemoryContext;
+ MemoryContext tupcontext;
+
+ Assert(onerel);
+ Assert(targrows > 0);
+
+ tupDesc = RelationGetDescr(onerel);
+ values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
+
+ /* Fetch options of foreign table */
+ fileGetOptions(RelationGetRelid(onerel), &filename, &options);
+
+ /*
+ * Get size of the file.
+ */
+ if (stat(filename, &stat_buf) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ filename)));
+
+ /*
+ * Convert size to pages for use in I/O cost estimate.
+ */
+ *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
+ if (*totalpages < 1)
+ *totalpages = 1;
+
+ /*
+ * Create CopyState from FDW options.
+ */
+ cstate = BeginCopyFrom(onerel, filename, NIL, options);
+
+ /*
+ * Use per-tuple memory context to prevent leak of memory used to read rows
+ * from the file with Copy routines.
+ */
+ tupcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "file_fdw temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Prepare for sampling rows */
+ rstate = anl_init_selection_state(targrows);
+
+ /* Set up callback to identify error line number. */
+ errcontext.callback = CopyFromErrorCallback;
+ errcontext.arg = (void *) cstate;
+ errcontext.previous = error_context_stack;
+ error_context_stack = &errcontext;
+
+ *totalrows = 0;
+ *totaldeadrows = 0;
+ for (;;)
+ {
+ /* Check for user-requested abort or sleep */
+ vacuum_delay_point();
+
+ /* Fetch next row */
+ MemoryContextReset(tupcontext);
+ MemoryContextSwitchTo(tupcontext);
+
+ found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (!found)
+ break;
+
+ /*
+ * The first targrows sample rows are simply copied into the
+ * reservoir. Then we start replacing tuples in the sample until we
+ * reach the end of the relation. This algorithm is from Jeff Vitter's
+ * paper (see more info in commands/analyze.c).
+ */
+ if (numrows < targrows)
+ {
+ rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
+ }
+ else
+ {
+ /*
+ * t in Vitter's paper is the number of records already processed.
+ * If we need to compute a new S value, we must use the
+ * not-yet-incremented value of totalrows as t.
+ */
+ if (rowstoskip < 0)
+ rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate);
+
+ if (rowstoskip <= 0)
+ {
+ /*
+ * Found a suitable tuple, so save it, replacing one
+ * old tuple at random
+ */
+ int k = (int) (targrows * anl_random_fract());
+
+ Assert(k >= 0 && k < targrows);
+ heap_freetuple(rows[k]);
+ rows[k] = heap_form_tuple(tupDesc, values, nulls);
+ }
+
+ rowstoskip -= 1;
+ }
+
+ *totalrows += 1;
+ }
+
+ /* Remove error callback. */
+ error_context_stack = errcontext.previous;
+
+ /* Clean up. */
+ MemoryContextDelete(tupcontext);
+
+ EndCopyFrom(cstate);
+
+ pfree(values);
+ pfree(nulls);
+
+ /*
+ * Emit some interesting relation info
+ */
+ ereport(elevel,
+ (errmsg("\"%s\": scanned %u pages containing %.0f rows; "
+ "%d rows in sample",
+ RelationGetRelationName(onerel),
+ *totalpages, *totalrows, numrows)));
+
+ return numrows;
+}