diff options
Diffstat (limited to 'contrib/file_fdw/file_fdw.c')
-rw-r--r-- | contrib/file_fdw/file_fdw.c | 248 |
1 files changed, 225 insertions, 23 deletions
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index e8907709bd9..30ed9fbad14 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -20,6 +20,7 @@ #include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" +#include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "miscadmin.h" @@ -28,6 +29,7 @@ #include "optimizer/pathnode.h" #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" +#include "utils/memutils.h" #include "utils/rel.h" PG_MODULE_MAGIC; @@ -123,6 +125,7 @@ static void fileBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node); static void fileReScanForeignScan(ForeignScanState *node); static void fileEndForeignScan(ForeignScanState *node); +static AcquireSampleRowsFunc fileAnalyzeForeignTable(Relation relation); /* * Helper functions @@ -136,6 +139,10 @@ static void estimate_size(PlannerInfo *root, RelOptInfo *baserel, static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, FileFdwPlanState *fdw_private, Cost *startup_cost, Cost *total_cost); +static int file_acquire_sample_rows(Relation onerel, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, double *totaldeadrows, + BlockNumber *totalpages); /* @@ -155,6 +162,7 @@ file_fdw_handler(PG_FUNCTION_ARGS) fdwroutine->IterateForeignScan = fileIterateForeignScan; fdwroutine->ReScanForeignScan = fileReScanForeignScan; fdwroutine->EndForeignScan = fileEndForeignScan; + fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; PG_RETURN_POINTER(fdwroutine); } @@ -614,6 +622,23 @@ fileIterateForeignScan(ForeignScanState *node) } /* + * fileReScanForeignScan + * Rescan table, possibly with new parameters + */ +static void +fileReScanForeignScan(ForeignScanState *node) +{ + FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; + + EndCopyFrom(festate->cstate); + + festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, + festate->filename, + NIL, + festate->options); +} + +/* * fileEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ @@ -628,20 +653,13 @@ fileEndForeignScan(ForeignScanState *node) } /* - * fileReScanForeignScan - * Rescan table, possibly with new parameters + * fileAnalyzeForeignTable + * Test whether analyzing this foreign table is supported */ -static void -fileReScanForeignScan(ForeignScanState *node) +static AcquireSampleRowsFunc +fileAnalyzeForeignTable(Relation relation) { - FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; - - EndCopyFrom(festate->cstate); - - festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, - festate->filename, - NIL, - festate->options); + return file_acquire_sample_rows; } /* @@ -657,7 +675,6 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel, { struct stat stat_buf; BlockNumber pages; - int tuple_width; double ntuples; double nrows; @@ -674,26 +691,45 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel, pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; if (pages < 1) pages = 1; - fdw_private->pages = pages; /* - * Estimate the number of tuples in the file. We back into this estimate - * using the planner's idea of the relation width; which is bogus if not - * all columns are being read, not to mention that the text representation - * of a row probably isn't the same size as its internal representation. - * FIXME later. + * Estimate the number of tuples in the file. */ - tuple_width = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData)); + if (baserel->pages > 0) + { + /* + * We have # of pages and # of tuples from pg_class (that is, from a + * previous ANALYZE), so compute a tuples-per-page estimate and scale + * that by the current file size. + */ + double density; - ntuples = clamp_row_est((double) stat_buf.st_size / (double) tuple_width); + density = baserel->tuples / (double) baserel->pages; + ntuples = clamp_row_est(density * (double) pages); + } + else + { + /* + * Otherwise we have to fake it. We back into this estimate using the + * planner's idea of the relation width; which is bogus if not all + * columns are being read, not to mention that the text representation + * of a row probably isn't the same size as its internal + * representation. Possibly we could do something better, but the + * real answer to anyone who complains is "ANALYZE" ... + */ + int tuple_width; + tuple_width = MAXALIGN(baserel->width) + + MAXALIGN(sizeof(HeapTupleHeaderData)); + ntuples = clamp_row_est((double) stat_buf.st_size / + (double) tuple_width); + } fdw_private->ntuples = ntuples; /* * Now estimate the number of rows returned by the scan after applying the - * baserestrictinfo quals. This is pretty bogus too, since the planner - * will have no stats about the relation, but it's better than nothing. + * baserestrictinfo quals. */ nrows = ntuples * clauselist_selectivity(root, @@ -736,3 +772,169 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel, run_cost += cpu_per_tuple * ntuples; *total_cost = *startup_cost + run_cost; } + +/* + * file_acquire_sample_rows -- acquire a random sample of rows from the table + * + * Selected rows are returned in the caller-allocated array rows[], + * which must have at least targrows entries. + * The actual number of rows selected is returned as the function result. + * We also count the total number of rows in the file and return it into + * *totalrows, and return the file's physical size in *totalpages. + * Note that *totaldeadrows is always set to 0. + * + * Note that the returned list of rows is not always in order by physical + * position in the file. Therefore, correlation estimates derived later + * may be meaningless, but it's OK because we don't use the estimates + * currently (the planner only pays attention to correlation for indexscans). + */ +static int +file_acquire_sample_rows(Relation onerel, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, double *totaldeadrows, + BlockNumber *totalpages) +{ + int numrows = 0; + double rowstoskip = -1; /* -1 means not set yet */ + double rstate; + TupleDesc tupDesc; + Datum *values; + bool *nulls; + bool found; + char *filename; + struct stat stat_buf; + List *options; + CopyState cstate; + ErrorContextCallback errcontext; + MemoryContext oldcontext = CurrentMemoryContext; + MemoryContext tupcontext; + + Assert(onerel); + Assert(targrows > 0); + + tupDesc = RelationGetDescr(onerel); + values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); + + /* Fetch options of foreign table */ + fileGetOptions(RelationGetRelid(onerel), &filename, &options); + + /* + * Get size of the file. + */ + if (stat(filename, &stat_buf) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + filename))); + + /* + * Convert size to pages for use in I/O cost estimate. + */ + *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; + if (*totalpages < 1) + *totalpages = 1; + + /* + * Create CopyState from FDW options. + */ + cstate = BeginCopyFrom(onerel, filename, NIL, options); + + /* + * Use per-tuple memory context to prevent leak of memory used to read rows + * from the file with Copy routines. + */ + tupcontext = AllocSetContextCreate(CurrentMemoryContext, + "file_fdw temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Prepare for sampling rows */ + rstate = anl_init_selection_state(targrows); + + /* Set up callback to identify error line number. */ + errcontext.callback = CopyFromErrorCallback; + errcontext.arg = (void *) cstate; + errcontext.previous = error_context_stack; + error_context_stack = &errcontext; + + *totalrows = 0; + *totaldeadrows = 0; + for (;;) + { + /* Check for user-requested abort or sleep */ + vacuum_delay_point(); + + /* Fetch next row */ + MemoryContextReset(tupcontext); + MemoryContextSwitchTo(tupcontext); + + found = NextCopyFrom(cstate, NULL, values, nulls, NULL); + + MemoryContextSwitchTo(oldcontext); + + if (!found) + break; + + /* + * The first targrows sample rows are simply copied into the + * reservoir. Then we start replacing tuples in the sample until we + * reach the end of the relation. This algorithm is from Jeff Vitter's + * paper (see more info in commands/analyze.c). + */ + if (numrows < targrows) + { + rows[numrows++] = heap_form_tuple(tupDesc, values, nulls); + } + else + { + /* + * t in Vitter's paper is the number of records already processed. + * If we need to compute a new S value, we must use the + * not-yet-incremented value of totalrows as t. + */ + if (rowstoskip < 0) + rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate); + + if (rowstoskip <= 0) + { + /* + * Found a suitable tuple, so save it, replacing one + * old tuple at random + */ + int k = (int) (targrows * anl_random_fract()); + + Assert(k >= 0 && k < targrows); + heap_freetuple(rows[k]); + rows[k] = heap_form_tuple(tupDesc, values, nulls); + } + + rowstoskip -= 1; + } + + *totalrows += 1; + } + + /* Remove error callback. */ + error_context_stack = errcontext.previous; + + /* Clean up. */ + MemoryContextDelete(tupcontext); + + EndCopyFrom(cstate); + + pfree(values); + pfree(nulls); + + /* + * Emit some interesting relation info + */ + ereport(elevel, + (errmsg("\"%s\": scanned %u pages containing %.0f rows; " + "%d rows in sample", + RelationGetRelationName(onerel), + *totalpages, *totalrows, numrows))); + + return numrows; +} |