@@ -930,6 +930,7 @@ TRACE_EVENT(xfarray_sort_stats,
__field(unsigned long long, loads)
__field(unsigned long long, stores)
__field(unsigned long long, compares)
+ __field(unsigned long long, heapsorts)
#endif
__field(unsigned int, max_stack_depth)
__field(unsigned int, max_stack_used)
@@ -941,6 +942,7 @@ TRACE_EVENT(xfarray_sort_stats,
__entry->loads = si->loads;
__entry->stores = si->stores;
__entry->compares = si->compares;
+ __entry->heapsorts = si->heapsorts;
#endif
__entry->max_stack_depth = si->max_stack_depth;
__entry->max_stack_used = si->max_stack_used;
@@ -948,7 +950,7 @@ TRACE_EVENT(xfarray_sort_stats,
),
TP_printk(
#ifdef DEBUG
- "xfino 0x%lx loads %llu stores %llu compares %llu stack_depth %u/%u error %d",
+ "xfino 0x%lx loads %llu stores %llu compares %llu heapsorts %llu stack_depth %u/%u error %d",
#else
"xfino 0x%lx stack_depth %u/%u error %d",
#endif
@@ -957,6 +959,7 @@ TRACE_EVENT(xfarray_sort_stats,
__entry->loads,
__entry->stores,
__entry->compares,
+ __entry->heapsorts,
#endif
__entry->max_stack_used,
__entry->max_stack_depth,
@@ -375,10 +375,12 @@ xfarray_load_next(
# define xfarray_sort_bump_loads(si) do { (si)->loads++; } while (0)
# define xfarray_sort_bump_stores(si) do { (si)->stores++; } while (0)
# define xfarray_sort_bump_compares(si) do { (si)->compares++; } while (0)
+# define xfarray_sort_bump_heapsorts(si) do { (si)->heapsorts++; } while (0)
#else
# define xfarray_sort_bump_loads(si)
# define xfarray_sort_bump_stores(si)
# define xfarray_sort_bump_compares(si)
+# define xfarray_sort_bump_heapsorts(si)
#endif /* DEBUG */
/* Load an array element for sorting. */
@@ -441,15 +443,19 @@ xfarray_sortinfo_alloc(
/*
* Tail-call recursion during the partitioning phase means that
* quicksort will never recurse more than log2(nr) times. We need one
- * extra level of stack to hold the initial parameters.
+ * extra level of stack to hold the initial parameters. In-memory
+ * sort will always take care of the last few levels of recursion for
+ * us, so we can reduce the stack depth by that much.
*/
- max_stack_depth = ilog2(array->nr) + 1;
+ max_stack_depth = ilog2(array->nr) + 1 - (XFARRAY_ISORT_SHIFT - 1);
+ if (max_stack_depth < 1)
+ max_stack_depth = 1;
/* Each level of quicksort uses a lo and a hi index */
nr_bytes += max_stack_depth * sizeof(xfarray_idx_t) * 2;
- /* One record for the pivot */
- nr_bytes += array->obj_size;
+ /* Scratchpad for in-memory sort, or one record for the pivot */
+ nr_bytes += (XFARRAY_ISORT_NR * array->obj_size);
si = kvzalloc(nr_bytes, XCHK_GFP_FLAGS);
if (!si)
@@ -491,7 +497,7 @@ xfarray_sort_terminated(
return false;
}
-/* Do we want an insertion sort? */
+/* Do we want an in-memory sort? */
static inline bool
xfarray_want_isort(
struct xfarray_sortinfo *si,
@@ -499,10 +505,10 @@ xfarray_want_isort(
xfarray_idx_t end)
{
/*
- * For array subsets smaller than 8 elements, it's slightly faster to
- * use insertion sort than quicksort's stack machine.
+ * For array subsets that fit in the scratchpad, it's much faster to
+ * use the kernel's heapsort than quicksort's stack machine.
*/
- return (end - start) < 8;
+ return (end - start) < XFARRAY_ISORT_NR;
}
/* Return the scratch space within the sortinfo structure. */
@@ -512,10 +518,8 @@ static inline void *xfarray_sortinfo_isort_scratch(struct xfarray_sortinfo *si)
}
/*
- * Perform an insertion sort on a subset of the array.
- * Though insertion sort is an O(n^2) algorithm, for small set sizes it's
- * faster than quicksort's stack machine, so we let it take over for that.
- * This ought to be replaced with something more efficient.
+ * Sort a small number of array records using scratchpad memory. The records
+ * need not be contiguous in the xfile's memory pages.
*/
STATIC int
xfarray_isort(
@@ -523,114 +527,23 @@ xfarray_isort(
xfarray_idx_t lo,
xfarray_idx_t hi)
{
- void *a = xfarray_sortinfo_isort_scratch(si);
- void *b = xfarray_scratch(si->array);
- xfarray_idx_t tmp;
- xfarray_idx_t i;
- xfarray_idx_t run;
+ void *scratch = xfarray_sortinfo_isort_scratch(si);
+ loff_t lo_pos = xfarray_pos(si->array, lo);
+ loff_t len = xfarray_pos(si->array, hi - lo + 1);
int error;
trace_xfarray_isort(si, lo, hi);
- /*
- * Move the smallest element in a[lo..hi] to a[lo]. This
- * simplifies the loop control logic below.
- */
- tmp = lo;
- error = xfarray_sort_load(si, tmp, b);
+ xfarray_sort_bump_loads(si);
+ error = xfile_obj_load(si->array->xfile, scratch, len, lo_pos);
if (error)
return error;
- for (run = lo + 1; run <= hi; run++) {
- /* if a[run] < a[tmp], tmp = run */
- error = xfarray_sort_load(si, run, a);
- if (error)
- return error;
- if (xfarray_sort_cmp(si, a, b) < 0) {
- tmp = run;
- memcpy(b, a, si->array->obj_size);
- }
- if (xfarray_sort_terminated(si, &error))
- return error;
- }
+ xfarray_sort_bump_heapsorts(si);
+ sort(scratch, hi - lo + 1, si->array->obj_size, si->cmp_fn, NULL);
- /*
- * The smallest element is a[tmp]; swap with a[lo] if tmp != lo.
- * Recall that a[tmp] is already in *b.
- */
- if (tmp != lo) {
- error = xfarray_sort_load(si, lo, a);
- if (error)
- return error;
- error = xfarray_sort_store(si, tmp, a);
- if (error)
- return error;
- error = xfarray_sort_store(si, lo, b);
- if (error)
- return error;
- }
-
- /*
- * Perform an insertion sort on a[lo+1..hi]. We already made sure
- * that the smallest value in the original range is now in a[lo],
- * so the inner loop should never underflow.
- *
- * For each a[lo+2..hi], make sure it's in the correct position
- * with respect to the elements that came before it.
- */
- for (run = lo + 2; run <= hi; run++) {
- error = xfarray_sort_load(si, run, a);
- if (error)
- return error;
-
- /*
- * Find the correct place for a[run] by walking leftwards
- * towards the start of the range until a[tmp] is no longer
- * greater than a[run].
- */
- tmp = run - 1;
- error = xfarray_sort_load(si, tmp, b);
- if (error)
- return error;
- while (xfarray_sort_cmp(si, a, b) < 0) {
- tmp--;
- error = xfarray_sort_load(si, tmp, b);
- if (error)
- return error;
-
- if (xfarray_sort_terminated(si, &error))
- return error;
- }
- tmp++;
-
- /*
- * If tmp != run, then a[tmp..run-1] are all less than a[run],
- * so right barrel roll a[tmp..run] to get this range in
- * sorted order.
- */
- if (tmp == run)
- continue;
-
- for (i = run; i >= tmp; i--) {
- error = xfarray_sort_load(si, i - 1, b);
- if (error)
- return error;
- error = xfarray_sort_store(si, i, b);
- if (error)
- return error;
-
- if (xfarray_sort_terminated(si, &error))
- return error;
- }
- error = xfarray_sort_store(si, tmp, a);
- if (error)
- return error;
-
- if (xfarray_sort_terminated(si, &error))
- return error;
- }
-
- return 0;
+ xfarray_sort_bump_stores(si);
+ return xfile_obj_store(si->array->xfile, scratch, len, lo_pos);
}
/* Return a pointer to the xfarray pivot record within the sortinfo struct. */
@@ -784,9 +697,8 @@ xfarray_qsort_push(
* current stack frame. This guarantees that we won't need more than
* log2(nr) stack space.
*
- * 4. Use insertion sort for small sets since since insertion sort is faster
- * for small, mostly sorted array segments. In the author's experience,
- * substituting insertion sort for arrays smaller than 8 elements yields
+ * 4. For small sets, load the records into the scratchpad and run heapsort on
+ * them because that is very fast. In the author's experience, this yields
* a ~10% reduction in runtime.
*/
@@ -59,6 +59,10 @@ int xfarray_load_next(struct xfarray *array, xfarray_idx_t *idx, void *rec);
typedef cmp_func_t xfarray_cmp_fn;
+/* Perform an in-memory heapsort for small subsets. */
+#define XFARRAY_ISORT_SHIFT (4)
+#define XFARRAY_ISORT_NR (1U << XFARRAY_ISORT_SHIFT)
+
struct xfarray_sortinfo {
struct xfarray *array;
@@ -82,6 +86,7 @@ struct xfarray_sortinfo {
uint64_t loads;
uint64_t stores;
uint64_t compares;
+ uint64_t heapsorts;
#endif
/*
@@ -100,11 +105,10 @@ struct xfarray_sortinfo {
*
* union {
*
- * If for a given subset we decide to use an insertion sort, we use the
- * scratchpad record after the xfarray and a second scratchpad record
- * here to compare items:
+ * If for a given subset we decide to use an in-memory sort, we use a
+ * block of scratchpad records here to compare items:
*
- * xfarray_rec_t scratch;
+ * xfarray_rec_t scratch[ISORT_NR];
*
* Otherwise, we want to partition the records to partition the array.
* We store the chosen pivot record here and use the xfarray scratchpad