diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtutils.c')
-rw-r--r-- | src/backend/access/nbtree/nbtutils.c | 756 |
1 files changed, 680 insertions, 76 deletions
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 2aee9bbf67d..108030a8ee7 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -30,6 +30,17 @@ static inline int32 _bt_compare_array_skey(FmgrInfo *orderproc, Datum tupdatum, bool tupnull, Datum arrdatum, ScanKey cur); +static void _bt_binsrch_skiparray_skey(bool cur_elem_trig, ScanDirection dir, + Datum tupdatum, bool tupnull, + BTArrayKeyInfo *array, ScanKey cur, + int32 *set_elem_result); +static void _bt_skiparray_set_element(Relation rel, ScanKey skey, BTArrayKeyInfo *array, + int32 set_elem_result, Datum tupdatum, bool tupnull); +static void _bt_skiparray_set_isnull(Relation rel, ScanKey skey, BTArrayKeyInfo *array); +static void _bt_array_set_low_or_high(Relation rel, ScanKey skey, + BTArrayKeyInfo *array, bool low_not_high); +static bool _bt_array_decrement(Relation rel, ScanKey skey, BTArrayKeyInfo *array); +static bool _bt_array_increment(Relation rel, ScanKey skey, BTArrayKeyInfo *array); static bool _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir); static void _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir); static bool _bt_tuple_before_array_skeys(IndexScanDesc scan, ScanDirection dir, @@ -207,6 +218,7 @@ _bt_compare_array_skey(FmgrInfo *orderproc, int32 result = 0; Assert(cur->sk_strategy == BTEqualStrategyNumber); + Assert(!(cur->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL))); if (tupnull) /* NULL tupdatum */ { @@ -283,6 +295,8 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc, Datum arrdatum; Assert(cur->sk_flags & SK_SEARCHARRAY); + Assert(!(cur->sk_flags & SK_BT_SKIP)); + Assert(!(cur->sk_flags & SK_ISNULL)); /* SAOP arrays never have NULLs */ Assert(cur->sk_strategy == BTEqualStrategyNumber); if (cur_elem_trig) @@ -406,6 +420,186 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc, } /* + * _bt_binsrch_skiparray_skey() -- "Binary search" within a skip array + * + * Does not return an index into the array, since skip arrays don't really + * contain elements (they generate their array elements procedurally instead). + * Our interface matches that of _bt_binsrch_array_skey in every other way. + * + * Sets *set_elem_result just like _bt_binsrch_array_skey would with a true + * array. The value 0 indicates that tupdatum/tupnull is within the range of + * the skip array. We return -1 when tupdatum/tupnull is lower that any value + * within the range of the array, and 1 when it is higher than every value. + * Caller should pass *set_elem_result to _bt_skiparray_set_element to advance + * the array. + * + * cur_elem_trig indicates if array advancement was triggered by this array's + * scan key. We use this to optimize-away comparisons that are known by our + * caller to be unnecessary from context, just like _bt_binsrch_array_skey. + */ +static void +_bt_binsrch_skiparray_skey(bool cur_elem_trig, ScanDirection dir, + Datum tupdatum, bool tupnull, + BTArrayKeyInfo *array, ScanKey cur, + int32 *set_elem_result) +{ + Assert(cur->sk_flags & SK_BT_SKIP); + Assert(cur->sk_flags & SK_SEARCHARRAY); + Assert(cur->sk_flags & SK_BT_REQFWD); + Assert(array->num_elems == -1); + Assert(!ScanDirectionIsNoMovement(dir)); + + if (array->null_elem) + { + Assert(!array->low_compare && !array->high_compare); + + *set_elem_result = 0; + return; + } + + if (tupnull) /* NULL tupdatum */ + { + if (cur->sk_flags & SK_BT_NULLS_FIRST) + *set_elem_result = -1; /* NULL "<" NOT_NULL */ + else + *set_elem_result = 1; /* NULL ">" NOT_NULL */ + return; + } + + /* + * Array inequalities determine whether tupdatum is within the range of + * caller's skip array + */ + *set_elem_result = 0; + if (ScanDirectionIsForward(dir)) + { + /* + * Evaluate low_compare first (unless cur_elem_trig tells us that it + * cannot possibly fail to be satisfied), then evaluate high_compare + */ + if (!cur_elem_trig && array->low_compare && + !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func, + array->low_compare->sk_collation, + tupdatum, + array->low_compare->sk_argument))) + *set_elem_result = -1; + else if (array->high_compare && + !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func, + array->high_compare->sk_collation, + tupdatum, + array->high_compare->sk_argument))) + *set_elem_result = 1; + } + else + { + /* + * Evaluate high_compare first (unless cur_elem_trig tells us that it + * cannot possibly fail to be satisfied), then evaluate low_compare + */ + if (!cur_elem_trig && array->high_compare && + !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func, + array->high_compare->sk_collation, + tupdatum, + array->high_compare->sk_argument))) + *set_elem_result = 1; + else if (array->low_compare && + !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func, + array->low_compare->sk_collation, + tupdatum, + array->low_compare->sk_argument))) + *set_elem_result = -1; + } + + /* + * Assert that any keys that were assumed to be satisfied already (due to + * caller passing cur_elem_trig=true) really are satisfied as expected + */ +#ifdef USE_ASSERT_CHECKING + if (cur_elem_trig) + { + if (ScanDirectionIsForward(dir) && array->low_compare) + Assert(DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func, + array->low_compare->sk_collation, + tupdatum, + array->low_compare->sk_argument))); + + if (ScanDirectionIsBackward(dir) && array->high_compare) + Assert(DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func, + array->high_compare->sk_collation, + tupdatum, + array->high_compare->sk_argument))); + } +#endif +} + +/* + * _bt_skiparray_set_element() -- Set skip array scan key's sk_argument + * + * Caller passes set_elem_result returned by _bt_binsrch_skiparray_skey for + * caller's tupdatum/tupnull. + * + * We copy tupdatum/tupnull into skey's sk_argument iff set_elem_result == 0. + * Otherwise, we set skey to either the lowest or highest value that's within + * the range of caller's skip array (whichever is the best available match to + * tupdatum/tupnull that is still within the range of the skip array according + * to _bt_binsrch_skiparray_skey/set_elem_result). + */ +static void +_bt_skiparray_set_element(Relation rel, ScanKey skey, BTArrayKeyInfo *array, + int32 set_elem_result, Datum tupdatum, bool tupnull) +{ + Assert(skey->sk_flags & SK_BT_SKIP); + Assert(skey->sk_flags & SK_SEARCHARRAY); + + if (set_elem_result) + { + /* tupdatum/tupnull is out of the range of the skip array */ + Assert(!array->null_elem); + + _bt_array_set_low_or_high(rel, skey, array, set_elem_result < 0); + return; + } + + /* Advance skip array to tupdatum (or tupnull) value */ + if (unlikely(tupnull)) + { + _bt_skiparray_set_isnull(rel, skey, array); + return; + } + + /* Free memory previously allocated for sk_argument if needed */ + if (!array->attbyval && skey->sk_argument) + pfree(DatumGetPointer(skey->sk_argument)); + + /* tupdatum becomes new sk_argument/new current element */ + skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL | + SK_BT_MINVAL | SK_BT_MAXVAL | + SK_BT_NEXT | SK_BT_PRIOR); + skey->sk_argument = datumCopy(tupdatum, array->attbyval, array->attlen); +} + +/* + * _bt_skiparray_set_isnull() -- set skip array scan key to NULL + */ +static void +_bt_skiparray_set_isnull(Relation rel, ScanKey skey, BTArrayKeyInfo *array) +{ + Assert(skey->sk_flags & SK_BT_SKIP); + Assert(skey->sk_flags & SK_SEARCHARRAY); + Assert(array->null_elem && !array->low_compare && !array->high_compare); + + /* Free memory previously allocated for sk_argument if needed */ + if (!array->attbyval && skey->sk_argument) + pfree(DatumGetPointer(skey->sk_argument)); + + /* NULL becomes new sk_argument/new current element */ + skey->sk_argument = (Datum) 0; + skey->sk_flags &= ~(SK_BT_MINVAL | SK_BT_MAXVAL | + SK_BT_NEXT | SK_BT_PRIOR); + skey->sk_flags |= (SK_SEARCHNULL | SK_ISNULL); +} + +/* * _bt_start_array_keys() -- Initialize array keys at start of a scan * * Set up the cur_elem counters and fill in the first sk_argument value for @@ -414,30 +608,356 @@ _bt_binsrch_array_skey(FmgrInfo *orderproc, void _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir) { + Relation rel = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; - int i; Assert(so->numArrayKeys); Assert(so->qual_ok); - for (i = 0; i < so->numArrayKeys; i++) + for (int i = 0; i < so->numArrayKeys; i++) { - BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; - ScanKey skey = &so->keyData[curArrayKey->scan_key]; + BTArrayKeyInfo *array = &so->arrayKeys[i]; + ScanKey skey = &so->keyData[array->scan_key]; - Assert(curArrayKey->num_elems > 0); Assert(skey->sk_flags & SK_SEARCHARRAY); - if (ScanDirectionIsBackward(dir)) - curArrayKey->cur_elem = curArrayKey->num_elems - 1; - else - curArrayKey->cur_elem = 0; - skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem]; + _bt_array_set_low_or_high(rel, skey, array, + ScanDirectionIsForward(dir)); } so->scanBehind = so->oppositeDirCheck = false; /* reset */ } /* + * _bt_array_set_low_or_high() -- Set array scan key to lowest/highest element + * + * Caller also passes associated scan key, which will have its argument set to + * the lowest/highest array value in passing. + */ +static void +_bt_array_set_low_or_high(Relation rel, ScanKey skey, BTArrayKeyInfo *array, + bool low_not_high) +{ + Assert(skey->sk_flags & SK_SEARCHARRAY); + + if (array->num_elems != -1) + { + /* set low or high element for SAOP array */ + int set_elem = 0; + + Assert(!(skey->sk_flags & SK_BT_SKIP)); + + if (!low_not_high) + set_elem = array->num_elems - 1; + + /* + * Just copy over array datum (only skip arrays require freeing and + * allocating memory for sk_argument) + */ + array->cur_elem = set_elem; + skey->sk_argument = array->elem_values[set_elem]; + + return; + } + + /* set low or high element for skip array */ + Assert(skey->sk_flags & SK_BT_SKIP); + Assert(array->num_elems == -1); + + /* Free memory previously allocated for sk_argument if needed */ + if (!array->attbyval && skey->sk_argument) + pfree(DatumGetPointer(skey->sk_argument)); + + /* Reset flags */ + skey->sk_argument = (Datum) 0; + skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL | + SK_BT_MINVAL | SK_BT_MAXVAL | + SK_BT_NEXT | SK_BT_PRIOR); + + if (array->null_elem && + (low_not_high == ((skey->sk_flags & SK_BT_NULLS_FIRST) != 0))) + { + /* Requested element (either lowest or highest) has the value NULL */ + skey->sk_flags |= (SK_SEARCHNULL | SK_ISNULL); + } + else if (low_not_high) + { + /* Setting array to lowest element (according to low_compare) */ + skey->sk_flags |= SK_BT_MINVAL; + } + else + { + /* Setting array to highest element (according to high_compare) */ + skey->sk_flags |= SK_BT_MAXVAL; + } +} + +/* + * _bt_array_decrement() -- decrement array scan key's sk_argument + * + * Return value indicates whether caller's array was successfully decremented. + * Cannot decrement an array whose current element is already the first one. + */ +static bool +_bt_array_decrement(Relation rel, ScanKey skey, BTArrayKeyInfo *array) +{ + bool uflow = false; + Datum dec_sk_argument; + + Assert(skey->sk_flags & SK_SEARCHARRAY); + Assert(!(skey->sk_flags & (SK_BT_MAXVAL | SK_BT_NEXT | SK_BT_PRIOR))); + + /* SAOP array? */ + if (array->num_elems != -1) + { + Assert(!(skey->sk_flags & (SK_BT_SKIP | SK_BT_MINVAL | SK_BT_MAXVAL))); + if (array->cur_elem > 0) + { + /* + * Just decrement current element, and assign its datum to skey + * (only skip arrays need us to free existing sk_argument memory) + */ + array->cur_elem--; + skey->sk_argument = array->elem_values[array->cur_elem]; + + /* Successfully decremented array */ + return true; + } + + /* Cannot decrement to before first array element */ + return false; + } + + /* Nope, this is a skip array */ + Assert(skey->sk_flags & SK_BT_SKIP); + + /* + * The sentinel value that represents the minimum value within the range + * of a skip array (often just -inf) is never decrementable + */ + if (skey->sk_flags & SK_BT_MINVAL) + return false; + + /* + * When the current array element is NULL, and the lowest sorting value in + * the index is also NULL, we cannot decrement before first array element + */ + if ((skey->sk_flags & SK_ISNULL) && (skey->sk_flags & SK_BT_NULLS_FIRST)) + return false; + + /* + * Opclasses without skip support "decrement" the scan key's current + * element by setting the PRIOR flag. The true prior value is determined + * by repositioning to the last index tuple < existing sk_argument/current + * array element. Note that this works in the usual way when the scan key + * is already marked ISNULL (i.e. when the current element is NULL). + */ + if (!array->sksup) + { + /* Successfully "decremented" array */ + skey->sk_flags |= SK_BT_PRIOR; + return true; + } + + /* + * Opclasses with skip support directly decrement sk_argument + */ + if (skey->sk_flags & SK_ISNULL) + { + Assert(!(skey->sk_flags & SK_BT_NULLS_FIRST)); + + /* + * Existing sk_argument/array element is NULL (for an IS NULL qual). + * + * "Decrement" from NULL to the high_elem value provided by opclass + * skip support routine. + */ + skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL); + skey->sk_argument = datumCopy(array->sksup->high_elem, + array->attbyval, array->attlen); + return true; + } + + /* + * Ask opclass support routine to provide decremented copy of existing + * non-NULL sk_argument + */ + dec_sk_argument = array->sksup->decrement(rel, skey->sk_argument, &uflow); + if (unlikely(uflow)) + { + /* dec_sk_argument has undefined value (so no pfree) */ + if (array->null_elem && (skey->sk_flags & SK_BT_NULLS_FIRST)) + { + _bt_skiparray_set_isnull(rel, skey, array); + + /* Successfully "decremented" array to NULL */ + return true; + } + + /* Cannot decrement to before first array element */ + return false; + } + + /* + * Successfully decremented sk_argument to a non-NULL value. Make sure + * that the decremented value is still within the range of the array. + */ + if (array->low_compare && + !DatumGetBool(FunctionCall2Coll(&array->low_compare->sk_func, + array->low_compare->sk_collation, + dec_sk_argument, + array->low_compare->sk_argument))) + { + /* Keep existing sk_argument after all */ + if (!array->attbyval) + pfree(DatumGetPointer(dec_sk_argument)); + + /* Cannot decrement to before first array element */ + return false; + } + + /* Accept value returned by opclass decrement callback */ + if (!array->attbyval && skey->sk_argument) + pfree(DatumGetPointer(skey->sk_argument)); + skey->sk_argument = dec_sk_argument; + + /* Successfully decremented array */ + return true; +} + +/* + * _bt_array_increment() -- increment array scan key's sk_argument + * + * Return value indicates whether caller's array was successfully incremented. + * Cannot increment an array whose current element is already the final one. + */ +static bool +_bt_array_increment(Relation rel, ScanKey skey, BTArrayKeyInfo *array) +{ + bool oflow = false; + Datum inc_sk_argument; + + Assert(skey->sk_flags & SK_SEARCHARRAY); + Assert(!(skey->sk_flags & (SK_BT_MINVAL | SK_BT_NEXT | SK_BT_PRIOR))); + + /* SAOP array? */ + if (array->num_elems != -1) + { + Assert(!(skey->sk_flags & (SK_BT_SKIP | SK_BT_MINVAL | SK_BT_MAXVAL))); + if (array->cur_elem < array->num_elems - 1) + { + /* + * Just increment current element, and assign its datum to skey + * (only skip arrays need us to free existing sk_argument memory) + */ + array->cur_elem++; + skey->sk_argument = array->elem_values[array->cur_elem]; + + /* Successfully incremented array */ + return true; + } + + /* Cannot increment past final array element */ + return false; + } + + /* Nope, this is a skip array */ + Assert(skey->sk_flags & SK_BT_SKIP); + + /* + * The sentinel value that represents the maximum value within the range + * of a skip array (often just +inf) is never incrementable + */ + if (skey->sk_flags & SK_BT_MAXVAL) + return false; + + /* + * When the current array element is NULL, and the highest sorting value + * in the index is also NULL, we cannot increment past the final element + */ + if ((skey->sk_flags & SK_ISNULL) && !(skey->sk_flags & SK_BT_NULLS_FIRST)) + return false; + + /* + * Opclasses without skip support "increment" the scan key's current + * element by setting the NEXT flag. The true next value is determined by + * repositioning to the first index tuple > existing sk_argument/current + * array element. Note that this works in the usual way when the scan key + * is already marked ISNULL (i.e. when the current element is NULL). + */ + if (!array->sksup) + { + /* Successfully "incremented" array */ + skey->sk_flags |= SK_BT_NEXT; + return true; + } + + /* + * Opclasses with skip support directly increment sk_argument + */ + if (skey->sk_flags & SK_ISNULL) + { + Assert(skey->sk_flags & SK_BT_NULLS_FIRST); + + /* + * Existing sk_argument/array element is NULL (for an IS NULL qual). + * + * "Increment" from NULL to the low_elem value provided by opclass + * skip support routine. + */ + skey->sk_flags &= ~(SK_SEARCHNULL | SK_ISNULL); + skey->sk_argument = datumCopy(array->sksup->low_elem, + array->attbyval, array->attlen); + return true; + } + + /* + * Ask opclass support routine to provide incremented copy of existing + * non-NULL sk_argument + */ + inc_sk_argument = array->sksup->increment(rel, skey->sk_argument, &oflow); + if (unlikely(oflow)) + { + /* inc_sk_argument has undefined value (so no pfree) */ + if (array->null_elem && !(skey->sk_flags & SK_BT_NULLS_FIRST)) + { + _bt_skiparray_set_isnull(rel, skey, array); + + /* Successfully "incremented" array to NULL */ + return true; + } + + /* Cannot increment past final array element */ + return false; + } + + /* + * Successfully incremented sk_argument to a non-NULL value. Make sure + * that the incremented value is still within the range of the array. + */ + if (array->high_compare && + !DatumGetBool(FunctionCall2Coll(&array->high_compare->sk_func, + array->high_compare->sk_collation, + inc_sk_argument, + array->high_compare->sk_argument))) + { + /* Keep existing sk_argument after all */ + if (!array->attbyval) + pfree(DatumGetPointer(inc_sk_argument)); + + /* Cannot increment past final array element */ + return false; + } + + /* Accept value returned by opclass increment callback */ + if (!array->attbyval && skey->sk_argument) + pfree(DatumGetPointer(skey->sk_argument)); + skey->sk_argument = inc_sk_argument; + + /* Successfully incremented array */ + return true; +} + +/* * _bt_advance_array_keys_increment() -- Advance to next set of array elements * * Advances the array keys by a single increment in the current scan @@ -452,6 +972,7 @@ _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir) static bool _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir) { + Relation rel = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; /* @@ -461,29 +982,30 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir) */ for (int i = so->numArrayKeys - 1; i >= 0; i--) { - BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; - ScanKey skey = &so->keyData[curArrayKey->scan_key]; - int cur_elem = curArrayKey->cur_elem; - int num_elems = curArrayKey->num_elems; - bool rolled = false; + BTArrayKeyInfo *array = &so->arrayKeys[i]; + ScanKey skey = &so->keyData[array->scan_key]; - if (ScanDirectionIsForward(dir) && ++cur_elem >= num_elems) + if (ScanDirectionIsForward(dir)) { - cur_elem = 0; - rolled = true; + if (_bt_array_increment(rel, skey, array)) + return true; } - else if (ScanDirectionIsBackward(dir) && --cur_elem < 0) + else { - cur_elem = num_elems - 1; - rolled = true; + if (_bt_array_decrement(rel, skey, array)) + return true; } - curArrayKey->cur_elem = cur_elem; - skey->sk_argument = curArrayKey->elem_values[cur_elem]; - if (!rolled) - return true; + /* + * Couldn't increment (or decrement) array. Handle array roll over. + * + * Start over at the array's lowest sorting value (or its highest + * value, for backward scans)... + */ + _bt_array_set_low_or_high(rel, skey, array, + ScanDirectionIsForward(dir)); - /* Need to advance next array key, if any */ + /* ...then increment (or decrement) next most significant array */ } /* @@ -507,7 +1029,7 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir) } /* - * _bt_rewind_nonrequired_arrays() -- Rewind non-required arrays + * _bt_rewind_nonrequired_arrays() -- Rewind SAOP arrays not marked required * * Called when _bt_advance_array_keys decides to start a new primitive index * scan on the basis of the current scan position being before the position @@ -539,10 +1061,15 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir) * * Note: _bt_verify_arrays_bt_first is called by an assertion to enforce that * everybody got this right. + * + * Note: In practice almost all SAOP arrays are marked required during + * preprocessing (if necessary by generating skip arrays). It is hardly ever + * truly necessary to call here, but consistently doing so is simpler. */ static void _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir) { + Relation rel = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; int arrayidx = 0; @@ -550,7 +1077,6 @@ _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir) { ScanKey cur = so->keyData + ikey; BTArrayKeyInfo *array = NULL; - int first_elem_dir; if (!(cur->sk_flags & SK_SEARCHARRAY) || cur->sk_strategy != BTEqualStrategyNumber) @@ -562,16 +1088,10 @@ _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir) if ((cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD))) continue; - if (ScanDirectionIsForward(dir)) - first_elem_dir = 0; - else - first_elem_dir = array->num_elems - 1; + Assert(array->num_elems != -1); /* No non-required skip arrays */ - if (array->cur_elem != first_elem_dir) - { - array->cur_elem = first_elem_dir; - cur->sk_argument = array->elem_values[first_elem_dir]; - } + _bt_array_set_low_or_high(rel, cur, array, + ScanDirectionIsForward(dir)); } } @@ -696,9 +1216,77 @@ _bt_tuple_before_array_skeys(IndexScanDesc scan, ScanDirection dir, tupdatum = index_getattr(tuple, cur->sk_attno, tupdesc, &tupnull); - result = _bt_compare_array_skey(&so->orderProcs[ikey], - tupdatum, tupnull, - cur->sk_argument, cur); + if (likely(!(cur->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL)))) + { + /* Scankey has a valid/comparable sk_argument value */ + result = _bt_compare_array_skey(&so->orderProcs[ikey], + tupdatum, tupnull, + cur->sk_argument, cur); + + if (result == 0) + { + /* + * Interpret result in a way that takes NEXT/PRIOR into + * account + */ + if (cur->sk_flags & SK_BT_NEXT) + result = -1; + else if (cur->sk_flags & SK_BT_PRIOR) + result = 1; + + Assert(result == 0 || (cur->sk_flags & SK_BT_SKIP)); + } + } + else + { + BTArrayKeyInfo *array = NULL; + + /* + * Current array element/array = scan key value is a sentinel + * value that represents the lowest (or highest) possible value + * that's still within the range of the array. + * + * Like _bt_first, we only see MINVAL keys during forwards scans + * (and similarly only see MAXVAL keys during backwards scans). + * Even if the scan's direction changes, we'll stop at some higher + * order key before we can ever reach any MAXVAL (or MINVAL) keys. + * (However, unlike _bt_first we _can_ get to keys marked either + * NEXT or PRIOR, regardless of the scan's current direction.) + */ + Assert(ScanDirectionIsForward(dir) ? + !(cur->sk_flags & SK_BT_MAXVAL) : + !(cur->sk_flags & SK_BT_MINVAL)); + + /* + * There are no valid sk_argument values in MINVAL/MAXVAL keys. + * Check if tupdatum is within the range of skip array instead. + */ + for (int arrayidx = 0; arrayidx < so->numArrayKeys; arrayidx++) + { + array = &so->arrayKeys[arrayidx]; + if (array->scan_key == ikey) + break; + } + + _bt_binsrch_skiparray_skey(false, dir, tupdatum, tupnull, + array, cur, &result); + + if (result == 0) + { + /* + * tupdatum satisfies both low_compare and high_compare, so + * it's time to advance the array keys. + * + * Note: It's possible that the skip array will "advance" from + * its MINVAL (or MAXVAL) representation to an alternative, + * logically equivalent representation of the same value: a + * representation where the = key gets a valid datum in its + * sk_argument. This is only possible when low_compare uses + * the >= strategy (or high_compare uses the <= strategy). + */ + return false; + } + } /* * Does this comparison indicate that caller must _not_ advance the @@ -1017,18 +1605,9 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate, */ if (beyond_end_advance) { - int final_elem_dir; - - if (ScanDirectionIsBackward(dir) || !array) - final_elem_dir = 0; - else - final_elem_dir = array->num_elems - 1; - - if (array && array->cur_elem != final_elem_dir) - { - array->cur_elem = final_elem_dir; - cur->sk_argument = array->elem_values[final_elem_dir]; - } + if (array) + _bt_array_set_low_or_high(rel, cur, array, + ScanDirectionIsBackward(dir)); continue; } @@ -1053,18 +1632,9 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate, */ if (!all_required_satisfied || cur->sk_attno > tupnatts) { - int first_elem_dir; - - if (ScanDirectionIsForward(dir) || !array) - first_elem_dir = 0; - else - first_elem_dir = array->num_elems - 1; - - if (array && array->cur_elem != first_elem_dir) - { - array->cur_elem = first_elem_dir; - cur->sk_argument = array->elem_values[first_elem_dir]; - } + if (array) + _bt_array_set_low_or_high(rel, cur, array, + ScanDirectionIsForward(dir)); continue; } @@ -1080,14 +1650,22 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate, bool cur_elem_trig = (sktrig_required && ikey == sktrig); /* - * Binary search for closest match that's available from the array + * "Binary search" by checking if tupdatum/tupnull are within the + * range of the skip array */ - set_elem = _bt_binsrch_array_skey(&so->orderProcs[ikey], - cur_elem_trig, dir, - tupdatum, tupnull, array, cur, - &result); + if (array->num_elems == -1) + _bt_binsrch_skiparray_skey(cur_elem_trig, dir, + tupdatum, tupnull, array, cur, + &result); - Assert(set_elem >= 0 && set_elem < array->num_elems); + /* + * Binary search for the closest match from the SAOP array + */ + else + set_elem = _bt_binsrch_array_skey(&so->orderProcs[ikey], + cur_elem_trig, dir, + tupdatum, tupnull, array, cur, + &result); } else { @@ -1163,11 +1741,21 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate, } } - /* Advance array keys, even when set_elem isn't an exact match */ - if (array && array->cur_elem != set_elem) + /* Advance array keys, even when we don't have an exact match */ + if (array) { - array->cur_elem = set_elem; - cur->sk_argument = array->elem_values[set_elem]; + if (array->num_elems == -1) + { + /* Skip array's new element is tupdatum (or MINVAL/MAXVAL) */ + _bt_skiparray_set_element(rel, cur, array, result, + tupdatum, tupnull); + } + else if (array->cur_elem != set_elem) + { + /* SAOP array's new element is set_elem datum */ + array->cur_elem = set_elem; + cur->sk_argument = array->elem_values[set_elem]; + } } } @@ -1581,10 +2169,11 @@ _bt_verify_keys_with_arraykeys(IndexScanDesc scan) if (array->scan_key != ikey) return false; - if (array->num_elems <= 0) + if (array->num_elems == 0 || array->num_elems < -1) return false; - if (cur->sk_argument != array->elem_values[array->cur_elem]) + if (array->num_elems != -1 && + cur->sk_argument != array->elem_values[array->cur_elem]) return false; if (last_sk_attno > cur->sk_attno) return false; @@ -1914,6 +2503,20 @@ _bt_check_compare(IndexScanDesc scan, ScanDirection dir, continue; } + /* + * A skip array scan key uses one of several sentinel values. We just + * fall back on _bt_tuple_before_array_skeys when we see such a value. + */ + if (key->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL | + SK_BT_NEXT | SK_BT_PRIOR)) + { + Assert(key->sk_flags & SK_SEARCHARRAY); + Assert(key->sk_flags & SK_BT_SKIP); + + *continuescan = false; + return false; + } + /* row-comparison keys need special processing */ if (key->sk_flags & SK_ROW_HEADER) { @@ -1939,6 +2542,7 @@ _bt_check_compare(IndexScanDesc scan, ScanDirection dir, else { Assert(key->sk_flags & SK_SEARCHNOTNULL); + Assert(!(key->sk_flags & SK_BT_SKIP)); if (!isNull) continue; /* tuple satisfies this qual */ } |