summaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtutils.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtutils.c')
-rw-r--r--src/backend/access/nbtree/nbtutils.c756
1 files changed, 680 insertions, 76 deletions
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 2aee9bbf67d..108030a8ee7 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -30,6 +30,17 @@
static inline int32 _bt_compare_array_skey(FmgrInfo *orderproc,
Datum tupdatum, bool tupnull,
Datum arrdatum, ScanKey cur);
+static void _bt_binsrch_skiparray_skey(bool cur_elem_trig, ScanDirection dir,
+ Datum tupdatum, bool tupnull,
+ BTArrayKeyInfo *array, ScanKey cur,
+ int32 *set_elem_result);
+static void _bt_skiparray_set_element(Relation rel, ScanKey skey, BTArrayKeyInfo *array,
+ int32 set_elem_result, Datum tupdatum, bool tupnull);
+static void _bt_skiparray_set_isnull(Relation rel, ScanKey skey, BTArrayKeyInfo *array);
+static void _bt_array_set_low_or_high(Relation rel, ScanKey skey,
+ BTArrayKeyInfo *array, bool low_not_high);
+static bool _bt_array_decrement(Relation rel, ScanKey skey, BTArrayKeyInfo *array);
+static bool _bt_array_increment(Relation rel, ScanKey skey, BTArrayKeyInfo *array);
static bool _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir);
static void _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir);
static bool _bt_tuple_before_array_skeys(IndexScanDesc scan, ScanDirection dir,
@@ -207,6 +218,7 @@ _bt_compare_array_skey(FmgrInfo *orderproc,
int32 result = 0;
Assert(cur->sk_strategy == BTEqualStrategyNumber);
+ Assert(!(cur->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL)));
if (tupnull) /* NULL tupdatum */
{
@@ -283,6 +295,8 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc,
Datum arrdatum;
Assert(cur->sk_flags & SK_SEARCHARRAY);
+ Assert(!(cur->sk_flags & SK_BT_SKIP));
+ Assert(!(cur->sk_flags & SK_ISNULL)); /* SAOP arrays never have NULLs */
Assert(cur->sk_strategy == BTEqualStrategyNumber);
if (cur_elem_trig)
@@ -406,6 +420,186 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc,
}
/*
+ * _bt_binsrch_skiparray_skey() -- "Binary search" within a skip array
+ *
+ * Does not return an index into the array, since skip arrays don't really
+ * contain elements (they generate their array elements procedurally instead).
+ * Our interface matches that of _bt_binsrch_array_skey in every other way.
+ *
+ * Sets *set_elem_result just like _bt_binsrch_array_skey would with a true
+ * array. The value 0 indicates that tupdatum/tupnull is within the range of
+ * the skip array. We return -1 when tupdatum/tupnull is lower that any value
+ * within the range of the array, and 1 when it is higher than every value.
+ * Caller should pass *set_elem_result to _bt_skiparray_set_element to advance
+ * the array.
+ *
+ * cur_elem_trig indicates if array advancement was triggered by this array's
+ * scan key. We use this to optimize-away comparisons that are known by our
+ * caller to be unnecessary from context, just like _bt_binsrch_array_skey.
+ */
+static void
+_bt_binsrch_skiparray_skey(bool cur_elem_trig, ScanDirection dir,
+ Datum tupdatum, bool tupnull,
+ BTArrayKeyInfo *array, ScanKey cur,
+ int32 *set_elem_result)
+{
+ Assert(cur->sk_flags & SK_BT_SKIP);
+ Assert(cur->sk_flags & SK_SEARCHARRAY);
+ Assert(cur->sk_flags & SK_BT_REQFWD);
+ Assert(array->num_elems == -1);
+ Assert(!ScanDirectionIsNoMovement(dir));
+
+ if (array->null_elem)
+ {
+ Assert(!array->low_compare && !array->high_compare);
+
+ *set_elem_result = 0;
+ return;
+ }
+
+ if (tupnull) /* NULL tupdatum */
+ {
+ if (cur->sk_flags & SK_BT_NULLS_FIRST)
+ *set_elem_result = -1; /* NULL "<" NOT_NULL */
+ else
+ *set_elem_result = 1; /* NULL ">" NOT_NULL */
+ return;
+ }
+
+ /*
+ * Array inequalities determine whether tupdatum is within the range of
+ * caller's skip array
+ */
+ *set_elem_result = 0;
+ if (ScanDirectionIsForward(dir))
+ {
+ /*
+ * Evaluate low_compare first (unless cur_elem_trig tells us that it
+ * cannot possibly fail to be satisfied), then evaluate high_compare
+ */
+ if (!cur_elem_trig && array->low_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func,
+ array->low_compare->sk_collation,
+ tupdatum,
+ array->low_compare->sk_argument)))
+ *set_elem_result = -1;
+ else if (array->high_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func,
+ array->high_compare->sk_collation,
+ tupdatum,
+ array->high_compare->sk_argument)))
+ *set_elem_result = 1;
+ }
+ else
+ {
+ /*
+ * Evaluate high_compare first (unless cur_elem_trig tells us that it
+ * cannot possibly fail to be satisfied), then evaluate low_compare
+ */
+ if (!cur_elem_trig && array->high_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func,
+ array->high_compare->sk_collation,
+ tupdatum,
+ array->high_compare->sk_argument)))
+ *set_elem_result = 1;
+ else if (array->low_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func,
+ array->low_compare->sk_collation,
+ tupdatum,
+ array->low_compare->sk_argument)))
+ *set_elem_result = -1;
+ }
+
+ /*
+ * Assert that any keys that were assumed to be satisfied already (due to
+ * caller passing cur_elem_trig=true) really are satisfied as expected
+ */
+#ifdef USE_ASSERT_CHECKING
+ if (cur_elem_trig)
+ {
+ if (ScanDirectionIsForward(dir) && array->low_compare)
+ Assert(DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func,
+ array->low_compare->sk_collation,
+ tupdatum,
+ array->low_compare->sk_argument)));
+
+ if (ScanDirectionIsBackward(dir) && array->high_compare)
+ Assert(DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func,
+ array->high_compare->sk_collation,
+ tupdatum,
+ array->high_compare->sk_argument)));
+ }
+#endif
+}
+
+/*
+ * _bt_skiparray_set_element() -- Set skip array scan key's sk_argument
+ *
+ * Caller passes set_elem_result returned by _bt_binsrch_skiparray_skey for
+ * caller's tupdatum/tupnull.
+ *
+ * We copy tupdatum/tupnull into skey's sk_argument iff set_elem_result == 0.
+ * Otherwise, we set skey to either the lowest or highest value that's within
+ * the range of caller's skip array (whichever is the best available match to
+ * tupdatum/tupnull that is still within the range of the skip array according
+ * to _bt_binsrch_skiparray_skey/set_elem_result).
+ */
+static void
+_bt_skiparray_set_element(Relation rel, ScanKey skey, BTArrayKeyInfo *array,
+ int32 set_elem_result, Datum tupdatum, bool tupnull)
+{
+ Assert(skey->sk_flags & SK_BT_SKIP);
+ Assert(skey->sk_flags & SK_SEARCHARRAY);
+
+ if (set_elem_result)
+ {
+ /* tupdatum/tupnull is out of the range of the skip array */
+ Assert(!array->null_elem);
+
+ _bt_array_set_low_or_high(rel, skey, array, set_elem_result < 0);
+ return;
+ }
+
+ /* Advance skip array to tupdatum (or tupnull) value */
+ if (unlikely(tupnull))
+ {
+ _bt_skiparray_set_isnull(rel, skey, array);
+ return;
+ }
+
+ /* Free memory previously allocated for sk_argument if needed */
+ if (!array->attbyval && skey->sk_argument)
+ pfree(DatumGetPointer(skey->sk_argument));
+
+ /* tupdatum becomes new sk_argument/new current element */
+ skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL |
+ SK_BT_MINVAL | SK_BT_MAXVAL |
+ SK_BT_NEXT | SK_BT_PRIOR);
+ skey->sk_argument = datumCopy(tupdatum, array->attbyval, array->attlen);
+}
+
+/*
+ * _bt_skiparray_set_isnull() -- set skip array scan key to NULL
+ */
+static void
+_bt_skiparray_set_isnull(Relation rel, ScanKey skey, BTArrayKeyInfo *array)
+{
+ Assert(skey->sk_flags & SK_BT_SKIP);
+ Assert(skey->sk_flags & SK_SEARCHARRAY);
+ Assert(array->null_elem && !array->low_compare && !array->high_compare);
+
+ /* Free memory previously allocated for sk_argument if needed */
+ if (!array->attbyval && skey->sk_argument)
+ pfree(DatumGetPointer(skey->sk_argument));
+
+ /* NULL becomes new sk_argument/new current element */
+ skey->sk_argument = (Datum) 0;
+ skey->sk_flags &= ~(SK_BT_MINVAL | SK_BT_MAXVAL |
+ SK_BT_NEXT | SK_BT_PRIOR);
+ skey->sk_flags |= (SK_SEARCHNULL | SK_ISNULL);
+}
+
+/*
* _bt_start_array_keys() -- Initialize array keys at start of a scan
*
* Set up the cur_elem counters and fill in the first sk_argument value for
@@ -414,30 +608,356 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc,
void
_bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
{
+ Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
- int i;
Assert(so->numArrayKeys);
Assert(so->qual_ok);
- for (i = 0; i < so->numArrayKeys; i++)
+ for (int i = 0; i < so->numArrayKeys; i++)
{
- BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
- ScanKey skey = &so->keyData[curArrayKey->scan_key];
+ BTArrayKeyInfo *array = &so->arrayKeys[i];
+ ScanKey skey = &so->keyData[array->scan_key];
- Assert(curArrayKey->num_elems > 0);
Assert(skey->sk_flags & SK_SEARCHARRAY);
- if (ScanDirectionIsBackward(dir))
- curArrayKey->cur_elem = curArrayKey->num_elems - 1;
- else
- curArrayKey->cur_elem = 0;
- skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
+ _bt_array_set_low_or_high(rel, skey, array,
+ ScanDirectionIsForward(dir));
}
so->scanBehind = so->oppositeDirCheck = false; /* reset */
}
/*
+ * _bt_array_set_low_or_high() -- Set array scan key to lowest/highest element
+ *
+ * Caller also passes associated scan key, which will have its argument set to
+ * the lowest/highest array value in passing.
+ */
+static void
+_bt_array_set_low_or_high(Relation rel, ScanKey skey, BTArrayKeyInfo *array,
+ bool low_not_high)
+{
+ Assert(skey->sk_flags & SK_SEARCHARRAY);
+
+ if (array->num_elems != -1)
+ {
+ /* set low or high element for SAOP array */
+ int set_elem = 0;
+
+ Assert(!(skey->sk_flags & SK_BT_SKIP));
+
+ if (!low_not_high)
+ set_elem = array->num_elems - 1;
+
+ /*
+ * Just copy over array datum (only skip arrays require freeing and
+ * allocating memory for sk_argument)
+ */
+ array->cur_elem = set_elem;
+ skey->sk_argument = array->elem_values[set_elem];
+
+ return;
+ }
+
+ /* set low or high element for skip array */
+ Assert(skey->sk_flags & SK_BT_SKIP);
+ Assert(array->num_elems == -1);
+
+ /* Free memory previously allocated for sk_argument if needed */
+ if (!array->attbyval && skey->sk_argument)
+ pfree(DatumGetPointer(skey->sk_argument));
+
+ /* Reset flags */
+ skey->sk_argument = (Datum) 0;
+ skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL |
+ SK_BT_MINVAL | SK_BT_MAXVAL |
+ SK_BT_NEXT | SK_BT_PRIOR);
+
+ if (array->null_elem &&
+ (low_not_high == ((skey->sk_flags & SK_BT_NULLS_FIRST) != 0)))
+ {
+ /* Requested element (either lowest or highest) has the value NULL */
+ skey->sk_flags |= (SK_SEARCHNULL | SK_ISNULL);
+ }
+ else if (low_not_high)
+ {
+ /* Setting array to lowest element (according to low_compare) */
+ skey->sk_flags |= SK_BT_MINVAL;
+ }
+ else
+ {
+ /* Setting array to highest element (according to high_compare) */
+ skey->sk_flags |= SK_BT_MAXVAL;
+ }
+}
+
+/*
+ * _bt_array_decrement() -- decrement array scan key's sk_argument
+ *
+ * Return value indicates whether caller's array was successfully decremented.
+ * Cannot decrement an array whose current element is already the first one.
+ */
+static bool
+_bt_array_decrement(Relation rel, ScanKey skey, BTArrayKeyInfo *array)
+{
+ bool uflow = false;
+ Datum dec_sk_argument;
+
+ Assert(skey->sk_flags & SK_SEARCHARRAY);
+ Assert(!(skey->sk_flags & (SK_BT_MAXVAL | SK_BT_NEXT | SK_BT_PRIOR)));
+
+ /* SAOP array? */
+ if (array->num_elems != -1)
+ {
+ Assert(!(skey->sk_flags & (SK_BT_SKIP | SK_BT_MINVAL | SK_BT_MAXVAL)));
+ if (array->cur_elem > 0)
+ {
+ /*
+ * Just decrement current element, and assign its datum to skey
+ * (only skip arrays need us to free existing sk_argument memory)
+ */
+ array->cur_elem--;
+ skey->sk_argument = array->elem_values[array->cur_elem];
+
+ /* Successfully decremented array */
+ return true;
+ }
+
+ /* Cannot decrement to before first array element */
+ return false;
+ }
+
+ /* Nope, this is a skip array */
+ Assert(skey->sk_flags & SK_BT_SKIP);
+
+ /*
+ * The sentinel value that represents the minimum value within the range
+ * of a skip array (often just -inf) is never decrementable
+ */
+ if (skey->sk_flags & SK_BT_MINVAL)
+ return false;
+
+ /*
+ * When the current array element is NULL, and the lowest sorting value in
+ * the index is also NULL, we cannot decrement before first array element
+ */
+ if ((skey->sk_flags & SK_ISNULL) && (skey->sk_flags & SK_BT_NULLS_FIRST))
+ return false;
+
+ /*
+ * Opclasses without skip support "decrement" the scan key's current
+ * element by setting the PRIOR flag. The true prior value is determined
+ * by repositioning to the last index tuple < existing sk_argument/current
+ * array element. Note that this works in the usual way when the scan key
+ * is already marked ISNULL (i.e. when the current element is NULL).
+ */
+ if (!array->sksup)
+ {
+ /* Successfully "decremented" array */
+ skey->sk_flags |= SK_BT_PRIOR;
+ return true;
+ }
+
+ /*
+ * Opclasses with skip support directly decrement sk_argument
+ */
+ if (skey->sk_flags & SK_ISNULL)
+ {
+ Assert(!(skey->sk_flags & SK_BT_NULLS_FIRST));
+
+ /*
+ * Existing sk_argument/array element is NULL (for an IS NULL qual).
+ *
+ * "Decrement" from NULL to the high_elem value provided by opclass
+ * skip support routine.
+ */
+ skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL);
+ skey->sk_argument = datumCopy(array->sksup->high_elem,
+ array->attbyval, array->attlen);
+ return true;
+ }
+
+ /*
+ * Ask opclass support routine to provide decremented copy of existing
+ * non-NULL sk_argument
+ */
+ dec_sk_argument = array->sksup->decrement(rel, skey->sk_argument, &uflow);
+ if (unlikely(uflow))
+ {
+ /* dec_sk_argument has undefined value (so no pfree) */
+ if (array->null_elem && (skey->sk_flags & SK_BT_NULLS_FIRST))
+ {
+ _bt_skiparray_set_isnull(rel, skey, array);
+
+ /* Successfully "decremented" array to NULL */
+ return true;
+ }
+
+ /* Cannot decrement to before first array element */
+ return false;
+ }
+
+ /*
+ * Successfully decremented sk_argument to a non-NULL value. Make sure
+ * that the decremented value is still within the range of the array.
+ */
+ if (array->low_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func,
+ array->low_compare->sk_collation,
+ dec_sk_argument,
+ array->low_compare->sk_argument)))
+ {
+ /* Keep existing sk_argument after all */
+ if (!array->attbyval)
+ pfree(DatumGetPointer(dec_sk_argument));
+
+ /* Cannot decrement to before first array element */
+ return false;
+ }
+
+ /* Accept value returned by opclass decrement callback */
+ if (!array->attbyval && skey->sk_argument)
+ pfree(DatumGetPointer(skey->sk_argument));
+ skey->sk_argument = dec_sk_argument;
+
+ /* Successfully decremented array */
+ return true;
+}
+
+/*
+ * _bt_array_increment() -- increment array scan key's sk_argument
+ *
+ * Return value indicates whether caller's array was successfully incremented.
+ * Cannot increment an array whose current element is already the final one.
+ */
+static bool
+_bt_array_increment(Relation rel, ScanKey skey, BTArrayKeyInfo *array)
+{
+ bool oflow = false;
+ Datum inc_sk_argument;
+
+ Assert(skey->sk_flags & SK_SEARCHARRAY);
+ Assert(!(skey->sk_flags & (SK_BT_MINVAL | SK_BT_NEXT | SK_BT_PRIOR)));
+
+ /* SAOP array? */
+ if (array->num_elems != -1)
+ {
+ Assert(!(skey->sk_flags & (SK_BT_SKIP | SK_BT_MINVAL | SK_BT_MAXVAL)));
+ if (array->cur_elem < array->num_elems - 1)
+ {
+ /*
+ * Just increment current element, and assign its datum to skey
+ * (only skip arrays need us to free existing sk_argument memory)
+ */
+ array->cur_elem++;
+ skey->sk_argument = array->elem_values[array->cur_elem];
+
+ /* Successfully incremented array */
+ return true;
+ }
+
+ /* Cannot increment past final array element */
+ return false;
+ }
+
+ /* Nope, this is a skip array */
+ Assert(skey->sk_flags & SK_BT_SKIP);
+
+ /*
+ * The sentinel value that represents the maximum value within the range
+ * of a skip array (often just +inf) is never incrementable
+ */
+ if (skey->sk_flags & SK_BT_MAXVAL)
+ return false;
+
+ /*
+ * When the current array element is NULL, and the highest sorting value
+ * in the index is also NULL, we cannot increment past the final element
+ */
+ if ((skey->sk_flags & SK_ISNULL) && !(skey->sk_flags & SK_BT_NULLS_FIRST))
+ return false;
+
+ /*
+ * Opclasses without skip support "increment" the scan key's current
+ * element by setting the NEXT flag. The true next value is determined by
+ * repositioning to the first index tuple > existing sk_argument/current
+ * array element. Note that this works in the usual way when the scan key
+ * is already marked ISNULL (i.e. when the current element is NULL).
+ */
+ if (!array->sksup)
+ {
+ /* Successfully "incremented" array */
+ skey->sk_flags |= SK_BT_NEXT;
+ return true;
+ }
+
+ /*
+ * Opclasses with skip support directly increment sk_argument
+ */
+ if (skey->sk_flags & SK_ISNULL)
+ {
+ Assert(skey->sk_flags & SK_BT_NULLS_FIRST);
+
+ /*
+ * Existing sk_argument/array element is NULL (for an IS NULL qual).
+ *
+ * "Increment" from NULL to the low_elem value provided by opclass
+ * skip support routine.
+ */
+ skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL);
+ skey->sk_argument = datumCopy(array->sksup->low_elem,
+ array->attbyval, array->attlen);
+ return true;
+ }
+
+ /*
+ * Ask opclass support routine to provide incremented copy of existing
+ * non-NULL sk_argument
+ */
+ inc_sk_argument = array->sksup->increment(rel, skey->sk_argument, &oflow);
+ if (unlikely(oflow))
+ {
+ /* inc_sk_argument has undefined value (so no pfree) */
+ if (array->null_elem && !(skey->sk_flags & SK_BT_NULLS_FIRST))
+ {
+ _bt_skiparray_set_isnull(rel, skey, array);
+
+ /* Successfully "incremented" array to NULL */
+ return true;
+ }
+
+ /* Cannot increment past final array element */
+ return false;
+ }
+
+ /*
+ * Successfully incremented sk_argument to a non-NULL value. Make sure
+ * that the incremented value is still within the range of the array.
+ */
+ if (array->high_compare &&
+ !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func,
+ array->high_compare->sk_collation,
+ inc_sk_argument,
+ array->high_compare->sk_argument)))
+ {
+ /* Keep existing sk_argument after all */
+ if (!array->attbyval)
+ pfree(DatumGetPointer(inc_sk_argument));
+
+ /* Cannot increment past final array element */
+ return false;
+ }
+
+ /* Accept value returned by opclass increment callback */
+ if (!array->attbyval && skey->sk_argument)
+ pfree(DatumGetPointer(skey->sk_argument));
+ skey->sk_argument = inc_sk_argument;
+
+ /* Successfully incremented array */
+ return true;
+}
+
+/*
* _bt_advance_array_keys_increment() -- Advance to next set of array elements
*
* Advances the array keys by a single increment in the current scan
@@ -452,6 +972,7 @@ _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
static bool
_bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir)
{
+ Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
/*
@@ -461,29 +982,30 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir)
*/
for (int i = so->numArrayKeys - 1; i >= 0; i--)
{
- BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
- ScanKey skey = &so->keyData[curArrayKey->scan_key];
- int cur_elem = curArrayKey->cur_elem;
- int num_elems = curArrayKey->num_elems;
- bool rolled = false;
+ BTArrayKeyInfo *array = &so->arrayKeys[i];
+ ScanKey skey = &so->keyData[array->scan_key];
- if (ScanDirectionIsForward(dir) && ++cur_elem >= num_elems)
+ if (ScanDirectionIsForward(dir))
{
- cur_elem = 0;
- rolled = true;
+ if (_bt_array_increment(rel, skey, array))
+ return true;
}
- else if (ScanDirectionIsBackward(dir) && --cur_elem < 0)
+ else
{
- cur_elem = num_elems - 1;
- rolled = true;
+ if (_bt_array_decrement(rel, skey, array))
+ return true;
}
- curArrayKey->cur_elem = cur_elem;
- skey->sk_argument = curArrayKey->elem_values[cur_elem];
- if (!rolled)
- return true;
+ /*
+ * Couldn't increment (or decrement) array. Handle array roll over.
+ *
+ * Start over at the array's lowest sorting value (or its highest
+ * value, for backward scans)...
+ */
+ _bt_array_set_low_or_high(rel, skey, array,
+ ScanDirectionIsForward(dir));
- /* Need to advance next array key, if any */
+ /* ...then increment (or decrement) next most significant array */
}
/*
@@ -507,7 +1029,7 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir)
}
/*
- * _bt_rewind_nonrequired_arrays() -- Rewind non-required arrays
+ * _bt_rewind_nonrequired_arrays() -- Rewind SAOP arrays not marked required
*
* Called when _bt_advance_array_keys decides to start a new primitive index
* scan on the basis of the current scan position being before the position
@@ -539,10 +1061,15 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir)
*
* Note: _bt_verify_arrays_bt_first is called by an assertion to enforce that
* everybody got this right.
+ *
+ * Note: In practice almost all SAOP arrays are marked required during
+ * preprocessing (if necessary by generating skip arrays). It is hardly ever
+ * truly necessary to call here, but consistently doing so is simpler.
*/
static void
_bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir)
{
+ Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
int arrayidx = 0;
@@ -550,7 +1077,6 @@ _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir)
{
ScanKey cur = so->keyData + ikey;
BTArrayKeyInfo *array = NULL;
- int first_elem_dir;
if (!(cur->sk_flags & SK_SEARCHARRAY) ||
cur->sk_strategy != BTEqualStrategyNumber)
@@ -562,16 +1088,10 @@ _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir)
if ((cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)))
continue;
- if (ScanDirectionIsForward(dir))
- first_elem_dir = 0;
- else
- first_elem_dir = array->num_elems - 1;
+ Assert(array->num_elems != -1); /* No non-required skip arrays */
- if (array->cur_elem != first_elem_dir)
- {
- array->cur_elem = first_elem_dir;
- cur->sk_argument = array->elem_values[first_elem_dir];
- }
+ _bt_array_set_low_or_high(rel, cur, array,
+ ScanDirectionIsForward(dir));
}
}
@@ -696,9 +1216,77 @@ _bt_tuple_before_array_skeys(IndexScanDesc scan, ScanDirection dir,
tupdatum = index_getattr(tuple, cur->sk_attno, tupdesc, &tupnull);
- result = _bt_compare_array_skey(&so->orderProcs[ikey],
- tupdatum, tupnull,
- cur->sk_argument, cur);
+ if (likely(!(cur->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL))))
+ {
+ /* Scankey has a valid/comparable sk_argument value */
+ result = _bt_compare_array_skey(&so->orderProcs[ikey],
+ tupdatum, tupnull,
+ cur->sk_argument, cur);
+
+ if (result == 0)
+ {
+ /*
+ * Interpret result in a way that takes NEXT/PRIOR into
+ * account
+ */
+ if (cur->sk_flags & SK_BT_NEXT)
+ result = -1;
+ else if (cur->sk_flags & SK_BT_PRIOR)
+ result = 1;
+
+ Assert(result == 0 || (cur->sk_flags & SK_BT_SKIP));
+ }
+ }
+ else
+ {
+ BTArrayKeyInfo *array = NULL;
+
+ /*
+ * Current array element/array = scan key value is a sentinel
+ * value that represents the lowest (or highest) possible value
+ * that's still within the range of the array.
+ *
+ * Like _bt_first, we only see MINVAL keys during forwards scans
+ * (and similarly only see MAXVAL keys during backwards scans).
+ * Even if the scan's direction changes, we'll stop at some higher
+ * order key before we can ever reach any MAXVAL (or MINVAL) keys.
+ * (However, unlike _bt_first we _can_ get to keys marked either
+ * NEXT or PRIOR, regardless of the scan's current direction.)
+ */
+ Assert(ScanDirectionIsForward(dir) ?
+ !(cur->sk_flags & SK_BT_MAXVAL) :
+ !(cur->sk_flags & SK_BT_MINVAL));
+
+ /*
+ * There are no valid sk_argument values in MINVAL/MAXVAL keys.
+ * Check if tupdatum is within the range of skip array instead.
+ */
+ for (int arrayidx = 0; arrayidx < so->numArrayKeys; arrayidx++)
+ {
+ array = &so->arrayKeys[arrayidx];
+ if (array->scan_key == ikey)
+ break;
+ }
+
+ _bt_binsrch_skiparray_skey(false, dir, tupdatum, tupnull,
+ array, cur, &result);
+
+ if (result == 0)
+ {
+ /*
+ * tupdatum satisfies both low_compare and high_compare, so
+ * it's time to advance the array keys.
+ *
+ * Note: It's possible that the skip array will "advance" from
+ * its MINVAL (or MAXVAL) representation to an alternative,
+ * logically equivalent representation of the same value: a
+ * representation where the = key gets a valid datum in its
+ * sk_argument. This is only possible when low_compare uses
+ * the >= strategy (or high_compare uses the <= strategy).
+ */
+ return false;
+ }
+ }
/*
* Does this comparison indicate that caller must _not_ advance the
@@ -1017,18 +1605,9 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
*/
if (beyond_end_advance)
{
- int final_elem_dir;
-
- if (ScanDirectionIsBackward(dir) || !array)
- final_elem_dir = 0;
- else
- final_elem_dir = array->num_elems - 1;
-
- if (array && array->cur_elem != final_elem_dir)
- {
- array->cur_elem = final_elem_dir;
- cur->sk_argument = array->elem_values[final_elem_dir];
- }
+ if (array)
+ _bt_array_set_low_or_high(rel, cur, array,
+ ScanDirectionIsBackward(dir));
continue;
}
@@ -1053,18 +1632,9 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
*/
if (!all_required_satisfied || cur->sk_attno > tupnatts)
{
- int first_elem_dir;
-
- if (ScanDirectionIsForward(dir) || !array)
- first_elem_dir = 0;
- else
- first_elem_dir = array->num_elems - 1;
-
- if (array && array->cur_elem != first_elem_dir)
- {
- array->cur_elem = first_elem_dir;
- cur->sk_argument = array->elem_values[first_elem_dir];
- }
+ if (array)
+ _bt_array_set_low_or_high(rel, cur, array,
+ ScanDirectionIsForward(dir));
continue;
}
@@ -1080,14 +1650,22 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
bool cur_elem_trig = (sktrig_required && ikey == sktrig);
/*
- * Binary search for closest match that's available from the array
+ * "Binary search" by checking if tupdatum/tupnull are within the
+ * range of the skip array
*/
- set_elem = _bt_binsrch_array_skey(&so->orderProcs[ikey],
- cur_elem_trig, dir,
- tupdatum, tupnull, array, cur,
- &result);
+ if (array->num_elems == -1)
+ _bt_binsrch_skiparray_skey(cur_elem_trig, dir,
+ tupdatum, tupnull, array, cur,
+ &result);
- Assert(set_elem >= 0 && set_elem < array->num_elems);
+ /*
+ * Binary search for the closest match from the SAOP array
+ */
+ else
+ set_elem = _bt_binsrch_array_skey(&so->orderProcs[ikey],
+ cur_elem_trig, dir,
+ tupdatum, tupnull, array, cur,
+ &result);
}
else
{
@@ -1163,11 +1741,21 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
}
}
- /* Advance array keys, even when set_elem isn't an exact match */
- if (array && array->cur_elem != set_elem)
+ /* Advance array keys, even when we don't have an exact match */
+ if (array)
{
- array->cur_elem = set_elem;
- cur->sk_argument = array->elem_values[set_elem];
+ if (array->num_elems == -1)
+ {
+ /* Skip array's new element is tupdatum (or MINVAL/MAXVAL) */
+ _bt_skiparray_set_element(rel, cur, array, result,
+ tupdatum, tupnull);
+ }
+ else if (array->cur_elem != set_elem)
+ {
+ /* SAOP array's new element is set_elem datum */
+ array->cur_elem = set_elem;
+ cur->sk_argument = array->elem_values[set_elem];
+ }
}
}
@@ -1581,10 +2169,11 @@ _bt_verify_keys_with_arraykeys(IndexScanDesc scan)
if (array->scan_key != ikey)
return false;
- if (array->num_elems <= 0)
+ if (array->num_elems == 0 || array->num_elems < -1)
return false;
- if (cur->sk_argument != array->elem_values[array->cur_elem])
+ if (array->num_elems != -1 &&
+ cur->sk_argument != array->elem_values[array->cur_elem])
return false;
if (last_sk_attno > cur->sk_attno)
return false;
@@ -1914,6 +2503,20 @@ _bt_check_compare(IndexScanDesc scan, ScanDirection dir,
continue;
}
+ /*
+ * A skip array scan key uses one of several sentinel values. We just
+ * fall back on _bt_tuple_before_array_skeys when we see such a value.
+ */
+ if (key->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL |
+ SK_BT_NEXT | SK_BT_PRIOR))
+ {
+ Assert(key->sk_flags & SK_SEARCHARRAY);
+ Assert(key->sk_flags & SK_BT_SKIP);
+
+ *continuescan = false;
+ return false;
+ }
+
/* row-comparison keys need special processing */
if (key->sk_flags & SK_ROW_HEADER)
{
@@ -1939,6 +2542,7 @@ _bt_check_compare(IndexScanDesc scan, ScanDirection dir,
else
{
Assert(key->sk_flags & SK_SEARCHNOTNULL);
+ Assert(!(key->sk_flags & SK_BT_SKIP));
if (!isNull)
continue; /* tuple satisfies this qual */
}