diff mbox series

[v1,2/3] lib/sort: Introduce rotate() to circular shift an array of elements

Message ID 20210824133351.88179-2-andriy.shevchenko@linux.intel.com (mailing list archive)
State New, archived
Headers show
Series [v1,1/3] lib/sort: Split out choose_swap_func() local helper | expand

Commit Message

Andy Shevchenko Aug. 24, 2021, 1:33 p.m. UTC
In some cases we want to circular shift an array of elements.
Introduce rotate() helper for that.

Signed-off-by: Andy Shevchenko <andriy.shevchenko@linux.intel.com>
---
 include/linux/sort.h |  3 +++
 lib/sort.c           | 61 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 64 insertions(+)

Comments

Rasmus Villemoes Aug. 25, 2021, 7:05 a.m. UTC | #1
On 24/08/2021 15.33, Andy Shevchenko wrote:
> In some cases we want to circular shift an array of elements.
> Introduce rotate() helper for that.
> 
> Signed-off-by: Andy Shevchenko <andriy.shevchenko@linux.intel.com>
> ---
>  include/linux/sort.h |  3 +++
>  lib/sort.c           | 61 ++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 64 insertions(+)
> 
> diff --git a/include/linux/sort.h b/include/linux/sort.h
> index b5898725fe9d..c881acb12ffc 100644
> --- a/include/linux/sort.h
> +++ b/include/linux/sort.h
> @@ -13,4 +13,7 @@ void sort(void *base, size_t num, size_t size,
>  	  cmp_func_t cmp_func,
>  	  swap_func_t swap_func);
>  
> +void rotate(void *base, size_t num, size_t size, size_t by,
> +	    swap_func_t swap_func);
> +
>  #endif
> diff --git a/lib/sort.c b/lib/sort.c
> index d9b2f5b73620..b9243f8db34b 100644
> --- a/lib/sort.c
> +++ b/lib/sort.c
> @@ -14,6 +14,7 @@
>  
>  #include <linux/types.h>
>  #include <linux/export.h>
> +#include <linux/minmax.h>
>  #include <linux/sort.h>
>  
>  /**
> @@ -275,3 +276,63 @@ void sort(void *base, size_t num, size_t size,
>  	return sort_r(base, num, size, _CMP_WRAPPER, swap_func, cmp_func);
>  }
>  EXPORT_SYMBOL(sort);
> +
> +/**
> + * rotate - rotate an array of elements by a number of elements
> + * @base: pointer to data to sort

sort?

> + * @num: number of elements
> + * @size: size of each element
> + * @by: number of elements to rotate by

Perhaps add (0 <= @by < @num) or something like that, and/or start the
implementation with "if (num <= 1) return; if (by >= num) by %= num;"

> + * @swap_func: pointer to swap function or NULL
> + *
> + * Helper function to advance all the elements of a circular buffer by
> + * @by positions.
> + */
> +void rotate(void *base, size_t num, size_t size, size_t by,
> +	    swap_func_t swap_func)
> +{
> +	struct {
> +		size_t begin, end;
> +	} arr[2] = {
> +		{ .begin = 0, .end = by - 1 },
> +		{ .begin = by, .end = num - 1 },
> +	};

I see you just copied-and-adapted, but I think the code would be much
easier to read without all those plus/minus ones all over.

> +	swap_func = choose_swap_func(swap_func, base, size);
> +
> +#define CHUNK_SIZE(a) ((a)->end - (a)->begin + 1)
> +
> +	/* Loop as long as we have out-of-place entries */
> +	while (CHUNK_SIZE(&arr[0]) && CHUNK_SIZE(&arr[1])) {
> +		size_t size0, i;
> +
> +		/*
> +		 * Find the number of entries that can be arranged on this
> +		 * iteration.
> +		 */
> +		size0 = min(CHUNK_SIZE(&arr[0]), CHUNK_SIZE(&arr[1]));
> +
> +		/* Swap the entries in two parts of the array */
> +		for (i = 0; i < size0; i++) {
> +			void *a = base + size * (arr[0].begin + i);
> +			void *b = base + size * (arr[1].begin + i);
> +
> +			do_swap(a, b, size, swap_func);
> +		}
> +
> +		if (CHUNK_SIZE(&arr[0]) > CHUNK_SIZE(&arr[1])) {
> +			/* The end of the first array remains unarranged */
> +			arr[0].begin += size0;
> +		} else {
> +			/*
> +			 * The first array is fully arranged so we proceed
> +			 * handling the next one.
> +			 */
> +			arr[0].begin = arr[1].begin;
> +			arr[0].end = arr[1].begin + size0 - 1;
> +			arr[1].begin += size0;
> +		}
> +	}

Perhaps add a small self-test, it's not at all obvious how this works
(perhaps it's some standard CS101 algorithm for rotating in-place, I
don't know, but even then an implementation can have off-by-ones and
corner cases).

for (len = 1; len < 15; ++len) {
  for (by = 0; by <= len; ++by) {
    for (i = 0; i < len; ++i)
      arr[i] = i;
    rotate(arr, len, sizeof(int), by);
    for (i = 0; i < len; ++i)
      if (arr[i] != (i + by) % len)
        error();
  }
}

Rasmus
Sakari Ailus Aug. 25, 2021, 8:08 a.m. UTC | #2
Hi Rasmus, Andy,

On Wed, Aug 25, 2021 at 09:05:19AM +0200, Rasmus Villemoes wrote:
> On 24/08/2021 15.33, Andy Shevchenko wrote:
> > In some cases we want to circular shift an array of elements.
> > Introduce rotate() helper for that.
> > 
> > Signed-off-by: Andy Shevchenko <andriy.shevchenko@linux.intel.com>
> > ---
> >  include/linux/sort.h |  3 +++
> >  lib/sort.c           | 61 ++++++++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 64 insertions(+)
> > 
> > diff --git a/include/linux/sort.h b/include/linux/sort.h
> > index b5898725fe9d..c881acb12ffc 100644
> > --- a/include/linux/sort.h
> > +++ b/include/linux/sort.h
> > @@ -13,4 +13,7 @@ void sort(void *base, size_t num, size_t size,
> >  	  cmp_func_t cmp_func,
> >  	  swap_func_t swap_func);
> >  
> > +void rotate(void *base, size_t num, size_t size, size_t by,
> > +	    swap_func_t swap_func);
> > +
> >  #endif
> > diff --git a/lib/sort.c b/lib/sort.c
> > index d9b2f5b73620..b9243f8db34b 100644
> > --- a/lib/sort.c
> > +++ b/lib/sort.c
> > @@ -14,6 +14,7 @@
> >  
> >  #include <linux/types.h>
> >  #include <linux/export.h>
> > +#include <linux/minmax.h>
> >  #include <linux/sort.h>
> >  
> >  /**
> > @@ -275,3 +276,63 @@ void sort(void *base, size_t num, size_t size,
> >  	return sort_r(base, num, size, _CMP_WRAPPER, swap_func, cmp_func);
> >  }
> >  EXPORT_SYMBOL(sort);
> > +
> > +/**
> > + * rotate - rotate an array of elements by a number of elements
> > + * @base: pointer to data to sort
> 
> sort?
> 
> > + * @num: number of elements
> > + * @size: size of each element
> > + * @by: number of elements to rotate by
> 
> Perhaps add (0 <= @by < @num) or something like that, and/or start the
> implementation with "if (num <= 1) return; if (by >= num) by %= num;"

The latter could be done unconditionally.

> 
> > + * @swap_func: pointer to swap function or NULL
> > + *
> > + * Helper function to advance all the elements of a circular buffer by
> > + * @by positions.
> > + */
> > +void rotate(void *base, size_t num, size_t size, size_t by,
> > +	    swap_func_t swap_func)
> > +{
> > +	struct {
> > +		size_t begin, end;
> > +	} arr[2] = {
> > +		{ .begin = 0, .end = by - 1 },
> > +		{ .begin = by, .end = num - 1 },
> > +	};
> 
> I see you just copied-and-adapted, but I think the code would be much
> easier to read without all those plus/minus ones all over.

Now that I think about it, they can be just removed. In that case end
refers to the element following end, rather than the last element.

> 
> > +	swap_func = choose_swap_func(swap_func, base, size);
> > +
> > +#define CHUNK_SIZE(a) ((a)->end - (a)->begin + 1)
> > +
> > +	/* Loop as long as we have out-of-place entries */
> > +	while (CHUNK_SIZE(&arr[0]) && CHUNK_SIZE(&arr[1])) {
> > +		size_t size0, i;
> > +
> > +		/*
> > +		 * Find the number of entries that can be arranged on this
> > +		 * iteration.
> > +		 */
> > +		size0 = min(CHUNK_SIZE(&arr[0]), CHUNK_SIZE(&arr[1]));
> > +
> > +		/* Swap the entries in two parts of the array */
> > +		for (i = 0; i < size0; i++) {
> > +			void *a = base + size * (arr[0].begin + i);
> > +			void *b = base + size * (arr[1].begin + i);
> > +
> > +			do_swap(a, b, size, swap_func);
> > +		}
> > +
> > +		if (CHUNK_SIZE(&arr[0]) > CHUNK_SIZE(&arr[1])) {
> > +			/* The end of the first array remains unarranged */
> > +			arr[0].begin += size0;
> > +		} else {
> > +			/*
> > +			 * The first array is fully arranged so we proceed
> > +			 * handling the next one.
> > +			 */
> > +			arr[0].begin = arr[1].begin;
> > +			arr[0].end = arr[1].begin + size0 - 1;
> > +			arr[1].begin += size0;
> > +		}
> > +	}
> 
> Perhaps add a small self-test, it's not at all obvious how this works
> (perhaps it's some standard CS101 algorithm for rotating in-place, I
> don't know, but even then an implementation can have off-by-ones and
> corner cases).

I don't know, I wrote this to fix a bug in the ipu3-cio2 driver. ;-) The
hardware, and so the arguments, were static. Nice to see it would be useful
elsewhere almost as-is.

> 
> for (len = 1; len < 15; ++len) {
>   for (by = 0; by <= len; ++by) {
>     for (i = 0; i < len; ++i)
>       arr[i] = i;
>     rotate(arr, len, sizeof(int), by);
>     for (i = 0; i < len; ++i)
>       if (arr[i] != (i + by) % len)
>         error();
>   }
> }

Makes sense to add something like that.

After addressing the comments, for patches from 1 to 3:

Acked-by: Sakari Ailus <sakari.ailus@linux.intel.com>
Rasmus Villemoes Aug. 25, 2021, 9:29 a.m. UTC | #3
On 25/08/2021 10.08, Sakari Ailus wrote:
> Hi Rasmus, Andy,
> 

>>> + * @num: number of elements
>>> + * @size: size of each element
>>> + * @by: number of elements to rotate by
>>
>> Perhaps add (0 <= @by < @num) or something like that, and/or start the
>> implementation with "if (num <= 1) return; if (by >= num) by %= num;"
> 
> The latter could be done unconditionally.

Yes (provided num is tested at least for being non-zero first, but then
it's mostly free to check <= 1 instead), but in the vast majority of
cases the caller would pass a sane value of by, and an unconditional %=
would thus waste 100+ clock cycles for nothing.

>>> +	struct {
>>> +		size_t begin, end;
>>> +	} arr[2] = {
>>> +		{ .begin = 0, .end = by - 1 },
>>> +		{ .begin = by, .end = num - 1 },
>>> +	};
>>
>> I see you just copied-and-adapted, but I think the code would be much
>> easier to read without all those plus/minus ones all over.
> 
> Now that I think about it, they can be just removed. In that case end
> refers to the element following end, rather than the last element.

Yes, as we almost always do array indexing in C... the math simply ends
up coming out more naturally that way in the majority of cases.

>> Perhaps add a small self-test, it's not at all obvious how this works
>> (perhaps it's some standard CS101 algorithm for rotating in-place, I
>> don't know, but even then an implementation can have off-by-ones and
>> corner cases).
> 
> I don't know, I wrote this to fix a bug in the ipu3-cio2 driver. ;-) The
> hardware, and so the arguments, were static. Nice to see it would be useful
> elsewhere almost as-is.

Well, Andy hasn't actually shown that it would be useful anywhere else.
I think I'd like to see another user. Just doing "move this helper to
lib/ because we can reuse choose-a-proper-swap-func and thus implement
this perhaps a tiny bit faster" without considering whether it's even
performance-critical in the sole user is not a good idea IMO.

Especially since it can affect code generation of the much more
important (at least, has many more users) sort() function - the
do_swap() function grows another user, so could make the compiler end up
choosing not to inline it anymore.

There's another slightly simpler way to implement rotate(), which might
end up having more users (though I can't find any currently): Add a
reverse() helper, then rotate() can be done as reverse(a, 0, n);
reverse(a, 0, k); reverse(a, k, n-k);. If my math is right, the current
suggested rotate() ends up doing n-gcd(n,k) swaps, while the
implementation in terms of a reverse() would do n-1 if either n or k is
odd, otherwise n, calls to swap().

Rasmus
Andy Shevchenko Aug. 25, 2021, 9:55 a.m. UTC | #4
On Wed, Aug 25, 2021 at 11:29:12AM +0200, Rasmus Villemoes wrote:
> On 25/08/2021 10.08, Sakari Ailus wrote:

...

> Well, Andy hasn't actually shown that it would be useful anywhere else.
> I think I'd like to see another user.

I have found another potential user, but in their case (it's networking)
the simple for-loop with swap() in use seems efficient enough (the element size
is 8 bytes there).

I haven't check for really custom implementations of entire rotate (where no
swap() macro is in use), it might be another user lurking around.

> Just doing "move this helper to
> lib/ because we can reuse choose-a-proper-swap-func and thus implement
> this perhaps a tiny bit faster" without considering whether it's even
> performance-critical in the sole user is not a good idea IMO.

I agree with you.

> Especially since it can affect code generation of the much more
> important (at least, has many more users) sort() function - the
> do_swap() function grows another user, so could make the compiler end up
> choosing not to inline it anymore.

This can be fixed by always inlining?

> There's another slightly simpler way to implement rotate(), which might
> end up having more users (though I can't find any currently): Add a
> reverse() helper, then rotate() can be done as reverse(a, 0, n);
> reverse(a, 0, k); reverse(a, k, n-k);. If my math is right, the current
> suggested rotate() ends up doing n-gcd(n,k) swaps, while the
> implementation in terms of a reverse() would do n-1 if either n or k is
> odd, otherwise n, calls to swap().

Interesting idea. And this, btw, may have more users per se.
diff mbox series

Patch

diff --git a/include/linux/sort.h b/include/linux/sort.h
index b5898725fe9d..c881acb12ffc 100644
--- a/include/linux/sort.h
+++ b/include/linux/sort.h
@@ -13,4 +13,7 @@  void sort(void *base, size_t num, size_t size,
 	  cmp_func_t cmp_func,
 	  swap_func_t swap_func);
 
+void rotate(void *base, size_t num, size_t size, size_t by,
+	    swap_func_t swap_func);
+
 #endif
diff --git a/lib/sort.c b/lib/sort.c
index d9b2f5b73620..b9243f8db34b 100644
--- a/lib/sort.c
+++ b/lib/sort.c
@@ -14,6 +14,7 @@ 
 
 #include <linux/types.h>
 #include <linux/export.h>
+#include <linux/minmax.h>
 #include <linux/sort.h>
 
 /**
@@ -275,3 +276,63 @@  void sort(void *base, size_t num, size_t size,
 	return sort_r(base, num, size, _CMP_WRAPPER, swap_func, cmp_func);
 }
 EXPORT_SYMBOL(sort);
+
+/**
+ * rotate - rotate an array of elements by a number of elements
+ * @base: pointer to data to sort
+ * @num: number of elements
+ * @size: size of each element
+ * @by: number of elements to rotate by
+ * @swap_func: pointer to swap function or NULL
+ *
+ * Helper function to advance all the elements of a circular buffer by
+ * @by positions.
+ */
+void rotate(void *base, size_t num, size_t size, size_t by,
+	    swap_func_t swap_func)
+{
+	struct {
+		size_t begin, end;
+	} arr[2] = {
+		{ .begin = 0, .end = by - 1 },
+		{ .begin = by, .end = num - 1 },
+	};
+
+	swap_func = choose_swap_func(swap_func, base, size);
+
+#define CHUNK_SIZE(a) ((a)->end - (a)->begin + 1)
+
+	/* Loop as long as we have out-of-place entries */
+	while (CHUNK_SIZE(&arr[0]) && CHUNK_SIZE(&arr[1])) {
+		size_t size0, i;
+
+		/*
+		 * Find the number of entries that can be arranged on this
+		 * iteration.
+		 */
+		size0 = min(CHUNK_SIZE(&arr[0]), CHUNK_SIZE(&arr[1]));
+
+		/* Swap the entries in two parts of the array */
+		for (i = 0; i < size0; i++) {
+			void *a = base + size * (arr[0].begin + i);
+			void *b = base + size * (arr[1].begin + i);
+
+			do_swap(a, b, size, swap_func);
+		}
+
+		if (CHUNK_SIZE(&arr[0]) > CHUNK_SIZE(&arr[1])) {
+			/* The end of the first array remains unarranged */
+			arr[0].begin += size0;
+		} else {
+			/*
+			 * The first array is fully arranged so we proceed
+			 * handling the next one.
+			 */
+			arr[0].begin = arr[1].begin;
+			arr[0].end = arr[1].begin + size0 - 1;
+			arr[1].begin += size0;
+		}
+	}
+#undef CHUNK_SIZE
+}
+EXPORT_SYMBOL(rotate);