diff mbox series

[2/2] kcompressd: Add Kcompressd for accelerated zram compression

Message ID 20250307120141.1566673-3-qun-wei.lin@mediatek.com (mailing list archive)
State New
Headers show
Series Improve Zram by separating compression context from kswapd | expand

Commit Message

Qun-wei Lin (林群崴) March 7, 2025, 12:01 p.m. UTC
Introduced Kcompressd to offload zram page compression, improving
system efficiency by handling compression separately from memory
reclaiming. Added necessary configurations and dependencies.

Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
---
 drivers/block/zram/Kconfig      |  11 ++
 drivers/block/zram/Makefile     |   3 +-
 drivers/block/zram/kcompressd.c | 340 ++++++++++++++++++++++++++++++++
 drivers/block/zram/kcompressd.h |  25 +++
 drivers/block/zram/zram_drv.c   |  22 ++-
 5 files changed, 397 insertions(+), 4 deletions(-)
 create mode 100644 drivers/block/zram/kcompressd.c
 create mode 100644 drivers/block/zram/kcompressd.h

Comments

Barry Song March 7, 2025, 7:41 p.m. UTC | #1
On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin <qun-wei.lin@mediatek.com> wrote:
>
> Introduced Kcompressd to offload zram page compression, improving
> system efficiency by handling compression separately from memory
> reclaiming. Added necessary configurations and dependencies.
>
> Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> ---
>  drivers/block/zram/Kconfig      |  11 ++
>  drivers/block/zram/Makefile     |   3 +-
>  drivers/block/zram/kcompressd.c | 340 ++++++++++++++++++++++++++++++++
>  drivers/block/zram/kcompressd.h |  25 +++
>  drivers/block/zram/zram_drv.c   |  22 ++-
>  5 files changed, 397 insertions(+), 4 deletions(-)
>  create mode 100644 drivers/block/zram/kcompressd.c
>  create mode 100644 drivers/block/zram/kcompressd.h
>
> diff --git a/drivers/block/zram/Kconfig b/drivers/block/zram/Kconfig
> index 402b7b175863..f0a1b574f770 100644
> --- a/drivers/block/zram/Kconfig
> +++ b/drivers/block/zram/Kconfig
> @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
>           re-compress pages using a potentially slower but more effective
>           compression algorithm. Note, that IDLE page recompression
>           requires ZRAM_TRACK_ENTRY_ACTIME.
> +
> +config KCOMPRESSD
> +       tristate "Kcompressd: Accelerated zram compression"
> +       depends on ZRAM
> +       help
> +         Kcompressd creates multiple daemons to accelerate the compression of pages
> +         in zram, offloading this time-consuming task from the zram driver.
> +
> +         This approach improves system efficiency by handling page compression separately,
> +         which was originally done by kswapd or direct reclaim.

For direct reclaim, we were previously able to compress using multiple CPUs
with multi-threading.
After your patch, it seems that only a single thread/CPU is used for compression
so it won't necessarily improve direct reclaim performance?

Even for kswapd, we used to have multiple threads like [kswapd0], [kswapd1],
and [kswapd2] for different nodes. Now, are we also limited to just one thread?
I also wonder if this could be handled at the vmscan level instead of the zram
level. then it might potentially help other sync devices or even zswap later.

But I agree that for phones, modifying zram seems like an easier starting
point. However, relying on a single thread isn't always the best approach.


> +
> diff --git a/drivers/block/zram/Makefile b/drivers/block/zram/Makefile
> index 0fdefd576691..23baa5dfceb9 100644
> --- a/drivers/block/zram/Makefile
> +++ b/drivers/block/zram/Makefile
> @@ -9,4 +9,5 @@ zram-$(CONFIG_ZRAM_BACKEND_ZSTD)        += backend_zstd.o
>  zram-$(CONFIG_ZRAM_BACKEND_DEFLATE)    += backend_deflate.o
>  zram-$(CONFIG_ZRAM_BACKEND_842)                += backend_842.o
>
> -obj-$(CONFIG_ZRAM)     +=      zram.o
> +obj-$(CONFIG_ZRAM)             += zram.o
> +obj-$(CONFIG_KCOMPRESSD)       += kcompressd.o
> diff --git a/drivers/block/zram/kcompressd.c b/drivers/block/zram/kcompressd.c
> new file mode 100644
> index 000000000000..195b7e386869
> --- /dev/null
> +++ b/drivers/block/zram/kcompressd.c
> @@ -0,0 +1,340 @@
> +// SPDX-License-Identifier: GPL-2.0
> +/*
> + * Copyright (C) 2024 MediaTek Inc.
> + */
> +
> +#include <linux/module.h>
> +#include <linux/kernel.h>
> +#include <linux/bio.h>
> +#include <linux/bitops.h>
> +#include <linux/freezer.h>
> +#include <linux/kernel.h>
> +#include <linux/psi.h>
> +#include <linux/kfifo.h>
> +#include <linux/swap.h>
> +#include <linux/delay.h>
> +
> +#include "kcompressd.h"
> +
> +#define INIT_QUEUE_SIZE                4096
> +#define DEFAULT_NR_KCOMPRESSD  4
> +
> +static atomic_t enable_kcompressd;
> +static unsigned int nr_kcompressd;
> +static unsigned int queue_size_per_kcompressd;
> +static struct kcompress *kcompress;
> +
> +enum run_state {
> +       KCOMPRESSD_NOT_STARTED = 0,
> +       KCOMPRESSD_RUNNING,
> +       KCOMPRESSD_SLEEPING,
> +};
> +
> +struct kcompressd_para {
> +       wait_queue_head_t *kcompressd_wait;
> +       struct kfifo *write_fifo;
> +       atomic_t *running;
> +};
> +
> +static struct kcompressd_para *kcompressd_para;
> +static BLOCKING_NOTIFIER_HEAD(kcompressd_notifier_list);
> +
> +struct write_work {
> +       void *mem;
> +       struct bio *bio;
> +       compress_callback cb;
> +};
> +
> +int kcompressd_enabled(void)
> +{
> +       return likely(atomic_read(&enable_kcompressd));
> +}
> +EXPORT_SYMBOL(kcompressd_enabled);
> +
> +static void kcompressd_try_to_sleep(struct kcompressd_para *p)
> +{
> +       DEFINE_WAIT(wait);
> +
> +       if (!kfifo_is_empty(p->write_fifo))
> +               return;
> +
> +       if (freezing(current) || kthread_should_stop())
> +               return;
> +
> +       atomic_set(p->running, KCOMPRESSD_SLEEPING);
> +       prepare_to_wait(p->kcompressd_wait, &wait, TASK_INTERRUPTIBLE);
> +
> +       /*
> +        * After a short sleep, check if it was a premature sleep. If not, then
> +        * go fully to sleep until explicitly woken up.
> +        */
> +       if (!kthread_should_stop() && kfifo_is_empty(p->write_fifo))
> +               schedule();
> +
> +       finish_wait(p->kcompressd_wait, &wait);
> +       atomic_set(p->running, KCOMPRESSD_RUNNING);
> +}
> +
> +static int kcompressd(void *para)
> +{
> +       struct task_struct *tsk = current;
> +       struct kcompressd_para *p = (struct kcompressd_para *)para;
> +
> +       tsk->flags |= PF_MEMALLOC | PF_KSWAPD;
> +       set_freezable();
> +
> +       while (!kthread_should_stop()) {
> +               bool ret;
> +
> +               kcompressd_try_to_sleep(p);
> +               ret = try_to_freeze();
> +               if (kthread_should_stop())
> +                       break;
> +
> +               if (ret)
> +                       continue;
> +
> +               while (!kfifo_is_empty(p->write_fifo)) {
> +                       struct write_work entry;
> +
> +                       if (sizeof(struct write_work) == kfifo_out(p->write_fifo,
> +                                               &entry, sizeof(struct write_work))) {
> +                               entry.cb(entry.mem, entry.bio);
> +                               bio_put(entry.bio);
> +                       }
> +               }
> +
> +       }
> +
> +       tsk->flags &= ~(PF_MEMALLOC | PF_KSWAPD);
> +       atomic_set(p->running, KCOMPRESSD_NOT_STARTED);
> +       return 0;
> +}
> +
> +static int init_write_queue(void)
> +{
> +       int i;
> +       unsigned int queue_len = queue_size_per_kcompressd * sizeof(struct write_work);
> +
> +       for (i = 0; i < nr_kcompressd; i++) {
> +               if (kfifo_alloc(&kcompress[i].write_fifo,
> +                                       queue_len, GFP_KERNEL)) {
> +                       pr_err("Failed to alloc kfifo %d\n", i);
> +                       return -ENOMEM;
> +               }
> +       }
> +       return 0;
> +}
> +
> +static void clean_bio_queue(int idx)
> +{
> +       struct write_work entry;
> +
> +       while (sizeof(struct write_work) == kfifo_out(&kcompress[idx].write_fifo,
> +                               &entry, sizeof(struct write_work))) {
> +               bio_put(entry.bio);
> +               entry.cb(entry.mem, entry.bio);
> +       }
> +       kfifo_free(&kcompress[idx].write_fifo);
> +}
> +
> +static int kcompress_update(void)
> +{
> +       int i;
> +       int ret;
> +
> +       kcompress = kvmalloc_array(nr_kcompressd, sizeof(struct kcompress), GFP_KERNEL);
> +       if (!kcompress)
> +               return -ENOMEM;
> +
> +       kcompressd_para = kvmalloc_array(nr_kcompressd, sizeof(struct kcompressd_para), GFP_KERNEL);
> +       if (!kcompressd_para)
> +               return -ENOMEM;
> +
> +       ret = init_write_queue();
> +       if (ret) {
> +               pr_err("Initialization of writing to FIFOs failed!!\n");
> +               return ret;
> +       }
> +
> +       for (i = 0; i < nr_kcompressd; i++) {
> +               init_waitqueue_head(&kcompress[i].kcompressd_wait);
> +               kcompressd_para[i].kcompressd_wait = &kcompress[i].kcompressd_wait;
> +               kcompressd_para[i].write_fifo = &kcompress[i].write_fifo;
> +               kcompressd_para[i].running = &kcompress[i].running;
> +       }
> +
> +       return 0;
> +}
> +
> +static void stop_all_kcompressd_thread(void)
> +{
> +       int i;
> +
> +       for (i = 0; i < nr_kcompressd; i++) {
> +               kthread_stop(kcompress[i].kcompressd);
> +               kcompress[i].kcompressd = NULL;
> +               clean_bio_queue(i);
> +       }
> +}
> +
> +static int do_nr_kcompressd_handler(const char *val,
> +               const struct kernel_param *kp)
> +{
> +       int ret;
> +
> +       atomic_set(&enable_kcompressd, false);
> +
> +       stop_all_kcompressd_thread();
> +
> +       ret = param_set_int(val, kp);
> +       if (!ret) {
> +               pr_err("Invalid number of kcompressd.\n");
> +               return -EINVAL;
> +       }
> +
> +       ret = init_write_queue();
> +       if (ret) {
> +               pr_err("Initialization of writing to FIFOs failed!!\n");
> +               return ret;
> +       }
> +
> +       atomic_set(&enable_kcompressd, true);
> +
> +       return 0;
> +}
> +
> +static const struct kernel_param_ops param_ops_change_nr_kcompressd = {
> +       .set = &do_nr_kcompressd_handler,
> +       .get = &param_get_uint,
> +       .free = NULL,
> +};
> +
> +module_param_cb(nr_kcompressd, &param_ops_change_nr_kcompressd,
> +               &nr_kcompressd, 0644);
> +MODULE_PARM_DESC(nr_kcompressd, "Number of pre-created daemon for page compression");
> +
> +static int do_queue_size_per_kcompressd_handler(const char *val,
> +               const struct kernel_param *kp)
> +{
> +       int ret;
> +
> +       atomic_set(&enable_kcompressd, false);
> +
> +       stop_all_kcompressd_thread();
> +
> +       ret = param_set_int(val, kp);
> +       if (!ret) {
> +               pr_err("Invalid queue size for kcompressd.\n");
> +               return -EINVAL;
> +       }
> +
> +       ret = init_write_queue();
> +       if (ret) {
> +               pr_err("Initialization of writing to FIFOs failed!!\n");
> +               return ret;
> +       }
> +
> +       pr_info("Queue size for kcompressd was changed: %d\n", queue_size_per_kcompressd);
> +
> +       atomic_set(&enable_kcompressd, true);
> +       return 0;
> +}
> +
> +static const struct kernel_param_ops param_ops_change_queue_size_per_kcompressd = {
> +       .set = &do_queue_size_per_kcompressd_handler,
> +       .get = &param_get_uint,
> +       .free = NULL,
> +};
> +
> +module_param_cb(queue_size_per_kcompressd, &param_ops_change_queue_size_per_kcompressd,
> +               &queue_size_per_kcompressd, 0644);
> +MODULE_PARM_DESC(queue_size_per_kcompressd,
> +               "Size of queue for kcompressd");
> +
> +int schedule_bio_write(void *mem, struct bio *bio, compress_callback cb)
> +{
> +       int i;
> +       bool submit_success = false;
> +       size_t sz_work = sizeof(struct write_work);
> +
> +       struct write_work entry = {
> +               .mem = mem,
> +               .bio = bio,
> +               .cb = cb
> +       };
> +
> +       if (unlikely(!atomic_read(&enable_kcompressd)))
> +               return -EBUSY;
> +
> +       if (!nr_kcompressd || !current_is_kswapd())
> +               return -EBUSY;
> +
> +       bio_get(bio);
> +
> +       for (i = 0; i < nr_kcompressd; i++) {
> +               submit_success =
> +                       (kfifo_avail(&kcompress[i].write_fifo) >= sz_work) &&
> +                       (sz_work == kfifo_in(&kcompress[i].write_fifo, &entry, sz_work));
> +
> +               if (submit_success) {
> +                       switch (atomic_read(&kcompress[i].running)) {
> +                       case KCOMPRESSD_NOT_STARTED:
> +                               atomic_set(&kcompress[i].running, KCOMPRESSD_RUNNING);
> +                               kcompress[i].kcompressd = kthread_run(kcompressd,
> +                                               &kcompressd_para[i], "kcompressd:%d", i);
> +                               if (IS_ERR(kcompress[i].kcompressd)) {
> +                                       atomic_set(&kcompress[i].running, KCOMPRESSD_NOT_STARTED);
> +                                       pr_warn("Failed to start kcompressd:%d\n", i);
> +                                       clean_bio_queue(i);
> +                               }
> +                               break;
> +                       case KCOMPRESSD_RUNNING:
> +                               break;
> +                       case KCOMPRESSD_SLEEPING:
> +                               wake_up_interruptible(&kcompress[i].kcompressd_wait);
> +                               break;
> +                       }
> +                       return 0;
> +               }
> +       }
> +
> +       bio_put(bio);
> +       return -EBUSY;
> +}
> +EXPORT_SYMBOL(schedule_bio_write);
> +
> +static int __init kcompressd_init(void)
> +{
> +       int ret;
> +
> +       nr_kcompressd = DEFAULT_NR_KCOMPRESSD;
> +       queue_size_per_kcompressd = INIT_QUEUE_SIZE;
> +
> +       ret = kcompress_update();
> +       if (ret) {
> +               pr_err("Init kcompressd failed!\n");
> +               return ret;
> +       }
> +
> +       atomic_set(&enable_kcompressd, true);
> +       blocking_notifier_call_chain(&kcompressd_notifier_list, 0, NULL);
> +       return 0;
> +}
> +
> +static void __exit kcompressd_exit(void)
> +{
> +       atomic_set(&enable_kcompressd, false);
> +       stop_all_kcompressd_thread();
> +
> +       kvfree(kcompress);
> +       kvfree(kcompressd_para);
> +}
> +
> +module_init(kcompressd_init);
> +module_exit(kcompressd_exit);
> +
> +MODULE_LICENSE("Dual BSD/GPL");
> +MODULE_AUTHOR("Qun-Wei Lin <qun-wei.lin@mediatek.com>");
> +MODULE_DESCRIPTION("Separate the page compression from the memory reclaiming");
> +
> diff --git a/drivers/block/zram/kcompressd.h b/drivers/block/zram/kcompressd.h
> new file mode 100644
> index 000000000000..2fe0b424a7af
> --- /dev/null
> +++ b/drivers/block/zram/kcompressd.h
> @@ -0,0 +1,25 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +/*
> + * Copyright (C) 2024 MediaTek Inc.
> + */
> +
> +#ifndef _KCOMPRESSD_H_
> +#define _KCOMPRESSD_H_
> +
> +#include <linux/rwsem.h>
> +#include <linux/kfifo.h>
> +#include <linux/atomic.h>
> +
> +typedef void (*compress_callback)(void *mem, struct bio *bio);
> +
> +struct kcompress {
> +       struct task_struct *kcompressd;
> +       wait_queue_head_t kcompressd_wait;
> +       struct kfifo write_fifo;
> +       atomic_t running;
> +};
> +
> +int kcompressd_enabled(void);
> +int schedule_bio_write(void *mem, struct bio *bio, compress_callback cb);
> +#endif
> +
> diff --git a/drivers/block/zram/zram_drv.c b/drivers/block/zram/zram_drv.c
> index 2e1a70f2f4bd..bcd63ecb6ff2 100644
> --- a/drivers/block/zram/zram_drv.c
> +++ b/drivers/block/zram/zram_drv.c
> @@ -35,6 +35,7 @@
>  #include <linux/part_stat.h>
>  #include <linux/kernel_read_file.h>
>
> +#include "kcompressd.h"
>  #include "zram_drv.h"
>
>  static DEFINE_IDR(zram_index_idr);
> @@ -2240,6 +2241,15 @@ static void zram_bio_write(struct zram *zram, struct bio *bio)
>         bio_endio(bio);
>  }
>
> +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> +static void zram_bio_write_callback(void *mem, struct bio *bio)
> +{
> +       struct zram *zram = (struct zram *)mem;
> +
> +       zram_bio_write(zram, bio);
> +}
> +#endif
> +
>  /*
>   * Handler function for all zram I/O requests.
>   */
> @@ -2252,6 +2262,10 @@ static void zram_submit_bio(struct bio *bio)
>                 zram_bio_read(zram, bio);
>                 break;
>         case REQ_OP_WRITE:
> +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> +               if (kcompressd_enabled() && !schedule_bio_write(zram, bio, zram_bio_write_callback))
> +                       break;
> +#endif
>                 zram_bio_write(zram, bio);
>                 break;
>         case REQ_OP_DISCARD:
> @@ -2535,9 +2549,11 @@ static int zram_add(void)
>  #if ZRAM_LOGICAL_BLOCK_SIZE == PAGE_SIZE
>                 .max_write_zeroes_sectors       = UINT_MAX,
>  #endif
> -               .features                       = BLK_FEAT_STABLE_WRITES        |
> -                                                 BLK_FEAT_READ_SYNCHRONOUS     |
> -                                                 BLK_FEAT_WRITE_SYNCHRONOUS,
> +               .features                       = BLK_FEAT_STABLE_WRITES
> +                                                 | BLK_FEAT_READ_SYNCHRONOUS
> +#if !IS_ENABLED(CONFIG_KCOMPRESSD)
> +                                                 | BLK_FEAT_WRITE_SYNCHRONOUS,
> +#endif
>         };
>         struct zram *zram;
>         int ret, device_id;
> --
> 2.45.2
>

Thanks
Barry
Nhat Pham March 7, 2025, 11:13 p.m. UTC | #2
On Fri, Mar 7, 2025 at 11:41 AM Barry Song <21cnbao@gmail.com> wrote:
>
> On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin <qun-wei.lin@mediatek.com> wrote:
> >
> > Introduced Kcompressd to offload zram page compression, improving
> > system efficiency by handling compression separately from memory
> > reclaiming. Added necessary configurations and dependencies.
> >
> > Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > ---
> >  drivers/block/zram/Kconfig      |  11 ++
> >  drivers/block/zram/Makefile     |   3 +-
> >  drivers/block/zram/kcompressd.c | 340 ++++++++++++++++++++++++++++++++
> >  drivers/block/zram/kcompressd.h |  25 +++
> >  drivers/block/zram/zram_drv.c   |  22 ++-
> >  5 files changed, 397 insertions(+), 4 deletions(-)
> >  create mode 100644 drivers/block/zram/kcompressd.c
> >  create mode 100644 drivers/block/zram/kcompressd.h
> >
> > diff --git a/drivers/block/zram/Kconfig b/drivers/block/zram/Kconfig
> > index 402b7b175863..f0a1b574f770 100644
> > --- a/drivers/block/zram/Kconfig
> > +++ b/drivers/block/zram/Kconfig
> > @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
> >           re-compress pages using a potentially slower but more effective
> >           compression algorithm. Note, that IDLE page recompression
> >           requires ZRAM_TRACK_ENTRY_ACTIME.
> > +
> > +config KCOMPRESSD
> > +       tristate "Kcompressd: Accelerated zram compression"
> > +       depends on ZRAM
> > +       help
> > +         Kcompressd creates multiple daemons to accelerate the compression of pages
> > +         in zram, offloading this time-consuming task from the zram driver.
> > +
> > +         This approach improves system efficiency by handling page compression separately,
> > +         which was originally done by kswapd or direct reclaim.
>
> For direct reclaim, we were previously able to compress using multiple CPUs
> with multi-threading.
> After your patch, it seems that only a single thread/CPU is used for compression
> so it won't necessarily improve direct reclaim performance?
>
> Even for kswapd, we used to have multiple threads like [kswapd0], [kswapd1],
> and [kswapd2] for different nodes. Now, are we also limited to just one thread?
> I also wonder if this could be handled at the vmscan level instead of the zram
> level. then it might potentially help other sync devices or even zswap later.

Agree. A shared solution would be much appreciated. We can keep the
kcompressd idea, but have it accept IO work from multiple sources
(zram, zswap, whatever) through a shared API.

Otherwise we would need to reinvent the wheel multiple times :)
Nhat Pham March 7, 2025, 11:14 p.m. UTC | #3
On Fri, Mar 7, 2025 at 3:13 PM Nhat Pham <nphamcs@gmail.com> wrote:
>
>
> Agree. A shared solution would be much appreciated. We can keep the
> kcompressd idea, but have it accept IO work from multiple sources
> (zram, zswap, whatever) through a shared API.

by IO I meant compress work (should be double quoted "IO"). But you
get the idea :)
Qun-wei Lin (林群崴) March 10, 2025, 1:26 p.m. UTC | #4
On Fri, 2025-03-07 at 15:14 -0800, Nhat Pham wrote:
> 
> External email : Please do not click links or open attachments until
> you have verified the sender or the content.
> 
> 
> On Fri, Mar 7, 2025 at 3:13 PM Nhat Pham <nphamcs@gmail.com> wrote:
> > 
> > 
> > Agree. A shared solution would be much appreciated. We can keep the
> > kcompressd idea, but have it accept IO work from multiple sources
> > (zram, zswap, whatever) through a shared API.
> 
> by IO I meant compress work (should be double quoted "IO"). But you
> get the idea :)

I also agree that we can evolve into a more general solution.
So this is also why we designed kcompressd to do writeback using
callbacks rather than hard coding it directly into zram.

But currently we only extend Zram. If anyone has good suggestions, we
would greatly appreciate your help.

Thanks!
Qun-wei
Qun-wei Lin (林群崴) March 10, 2025, 1:26 p.m. UTC | #5
On Sat, 2025-03-08 at 08:41 +1300, Barry Song wrote:
> 
> External email : Please do not click links or open attachments until
> you have verified the sender or the content.
> 
> 
> On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin <qun-wei.lin@mediatek.com>
> wrote:
> > 
> > Introduced Kcompressd to offload zram page compression, improving
> > system efficiency by handling compression separately from memory
> > reclaiming. Added necessary configurations and dependencies.
> > 
> > Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > ---
> >  drivers/block/zram/Kconfig      |  11 ++
> >  drivers/block/zram/Makefile     |   3 +-
> >  drivers/block/zram/kcompressd.c | 340
> > ++++++++++++++++++++++++++++++++
> >  drivers/block/zram/kcompressd.h |  25 +++
> >  drivers/block/zram/zram_drv.c   |  22 ++-
> >  5 files changed, 397 insertions(+), 4 deletions(-)
> >  create mode 100644 drivers/block/zram/kcompressd.c
> >  create mode 100644 drivers/block/zram/kcompressd.h
> > 
> > diff --git a/drivers/block/zram/Kconfig
> > b/drivers/block/zram/Kconfig
> > index 402b7b175863..f0a1b574f770 100644
> > --- a/drivers/block/zram/Kconfig
> > +++ b/drivers/block/zram/Kconfig
> > @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
> >           re-compress pages using a potentially slower but more
> > effective
> >           compression algorithm. Note, that IDLE page recompression
> >           requires ZRAM_TRACK_ENTRY_ACTIME.
> > +
> > +config KCOMPRESSD
> > +       tristate "Kcompressd: Accelerated zram compression"
> > +       depends on ZRAM
> > +       help
> > +         Kcompressd creates multiple daemons to accelerate the
> > compression of pages
> > +         in zram, offloading this time-consuming task from the
> > zram driver.
> > +
> > +         This approach improves system efficiency by handling page
> > compression separately,
> > +         which was originally done by kswapd or direct reclaim.
> 
> For direct reclaim, we were previously able to compress using
> multiple CPUs
> with multi-threading.
> After your patch, it seems that only a single thread/CPU is used for
> compression
> so it won't necessarily improve direct reclaim performance?
> 

Our patch only splits the context of kswapd. When direct reclaim is
occurred, it is bypassed, so direct reclaim remains unchanged, with
each thread that needs memory directly reclaiming it.

> Even for kswapd, we used to have multiple threads like [kswapd0],
> [kswapd1],
> and [kswapd2] for different nodes. Now, are we also limited to just
> one thread?

We only considered a single kswapd here and didn't account for multiple
instances. Since I use kfifo to collect the bios, if there are multiple
kswapds, we need to add a lock to protect the bio queue. I can revise
this in the 2nd version, or do you have any other suggested approaches?

> I also wonder if this could be handled at the vmscan level instead of
> the zram
> level. then it might potentially help other sync devices or even
> zswap later.
> 
> But I agree that for phones, modifying zram seems like an easier
> starting
> point. However, relying on a single thread isn't always the best
> approach.
> 
> 
> > +
> > diff --git a/drivers/block/zram/Makefile
> > b/drivers/block/zram/Makefile
> > index 0fdefd576691..23baa5dfceb9 100644
> > --- a/drivers/block/zram/Makefile
> > +++ b/drivers/block/zram/Makefile
> > @@ -9,4 +9,5 @@ zram-$(CONFIG_ZRAM_BACKEND_ZSTD)        +=
> > backend_zstd.o
> >  zram-$(CONFIG_ZRAM_BACKEND_DEFLATE)    += backend_deflate.o
> >  zram-$(CONFIG_ZRAM_BACKEND_842)                += backend_842.o
> > 
> > -obj-$(CONFIG_ZRAM)     +=      zram.o
> > +obj-$(CONFIG_ZRAM)             += zram.o
> > +obj-$(CONFIG_KCOMPRESSD)       += kcompressd.o
> > diff --git a/drivers/block/zram/kcompressd.c
> > b/drivers/block/zram/kcompressd.c
> > new file mode 100644
> > index 000000000000..195b7e386869
> > --- /dev/null
> > +++ b/drivers/block/zram/kcompressd.c
> > @@ -0,0 +1,340 @@
> > +// SPDX-License-Identifier: GPL-2.0
> > +/*
> > + * Copyright (C) 2024 MediaTek Inc.
> > + */
> > +
> > +#include <linux/module.h>
> > +#include <linux/kernel.h>
> > +#include <linux/bio.h>
> > +#include <linux/bitops.h>
> > +#include <linux/freezer.h>
> > +#include <linux/kernel.h>
> > +#include <linux/psi.h>
> > +#include <linux/kfifo.h>
> > +#include <linux/swap.h>
> > +#include <linux/delay.h>
> > +
> > +#include "kcompressd.h"
> > +
> > +#define INIT_QUEUE_SIZE                4096
> > +#define DEFAULT_NR_KCOMPRESSD  4
> > +
> > +static atomic_t enable_kcompressd;
> > +static unsigned int nr_kcompressd;
> > +static unsigned int queue_size_per_kcompressd;
> > +static struct kcompress *kcompress;
> > +
> > +enum run_state {
> > +       KCOMPRESSD_NOT_STARTED = 0,
> > +       KCOMPRESSD_RUNNING,
> > +       KCOMPRESSD_SLEEPING,
> > +};
> > +
> > +struct kcompressd_para {
> > +       wait_queue_head_t *kcompressd_wait;
> > +       struct kfifo *write_fifo;
> > +       atomic_t *running;
> > +};
> > +
> > +static struct kcompressd_para *kcompressd_para;
> > +static BLOCKING_NOTIFIER_HEAD(kcompressd_notifier_list);
> > +
> > +struct write_work {
> > +       void *mem;
> > +       struct bio *bio;
> > +       compress_callback cb;
> > +};
> > +
> > +int kcompressd_enabled(void)
> > +{
> > +       return likely(atomic_read(&enable_kcompressd));
> > +}
> > +EXPORT_SYMBOL(kcompressd_enabled);
> > +
> > +static void kcompressd_try_to_sleep(struct kcompressd_para *p)
> > +{
> > +       DEFINE_WAIT(wait);
> > +
> > +       if (!kfifo_is_empty(p->write_fifo))
> > +               return;
> > +
> > +       if (freezing(current) || kthread_should_stop())
> > +               return;
> > +
> > +       atomic_set(p->running, KCOMPRESSD_SLEEPING);
> > +       prepare_to_wait(p->kcompressd_wait, &wait,
> > TASK_INTERRUPTIBLE);
> > +
> > +       /*
> > +        * After a short sleep, check if it was a premature sleep.
> > If not, then
> > +        * go fully to sleep until explicitly woken up.
> > +        */
> > +       if (!kthread_should_stop() && kfifo_is_empty(p-
> > >write_fifo))
> > +               schedule();
> > +
> > +       finish_wait(p->kcompressd_wait, &wait);
> > +       atomic_set(p->running, KCOMPRESSD_RUNNING);
> > +}
> > +
> > +static int kcompressd(void *para)
> > +{
> > +       struct task_struct *tsk = current;
> > +       struct kcompressd_para *p = (struct kcompressd_para *)para;
> > +
> > +       tsk->flags |= PF_MEMALLOC | PF_KSWAPD;
> > +       set_freezable();
> > +
> > +       while (!kthread_should_stop()) {
> > +               bool ret;
> > +
> > +               kcompressd_try_to_sleep(p);
> > +               ret = try_to_freeze();
> > +               if (kthread_should_stop())
> > +                       break;
> > +
> > +               if (ret)
> > +                       continue;
> > +
> > +               while (!kfifo_is_empty(p->write_fifo)) {
> > +                       struct write_work entry;
> > +
> > +                       if (sizeof(struct write_work) ==
> > kfifo_out(p->write_fifo,
> > +                                               &entry,
> > sizeof(struct write_work))) {
> > +                               entry.cb(entry.mem, entry.bio);
> > +                               bio_put(entry.bio);
> > +                       }
> > +               }
> > +
> > +       }
> > +
> > +       tsk->flags &= ~(PF_MEMALLOC | PF_KSWAPD);
> > +       atomic_set(p->running, KCOMPRESSD_NOT_STARTED);
> > +       return 0;
> > +}
> > +
> > +static int init_write_queue(void)
> > +{
> > +       int i;
> > +       unsigned int queue_len = queue_size_per_kcompressd *
> > sizeof(struct write_work);
> > +
> > +       for (i = 0; i < nr_kcompressd; i++) {
> > +               if (kfifo_alloc(&kcompress[i].write_fifo,
> > +                                       queue_len, GFP_KERNEL)) {
> > +                       pr_err("Failed to alloc kfifo %d\n", i);
> > +                       return -ENOMEM;
> > +               }
> > +       }
> > +       return 0;
> > +}
> > +
> > +static void clean_bio_queue(int idx)
> > +{
> > +       struct write_work entry;
> > +
> > +       while (sizeof(struct write_work) ==
> > kfifo_out(&kcompress[idx].write_fifo,
> > +                               &entry, sizeof(struct write_work)))
> > {
> > +               bio_put(entry.bio);
> > +               entry.cb(entry.mem, entry.bio);
> > +       }
> > +       kfifo_free(&kcompress[idx].write_fifo);
> > +}
> > +
> > +static int kcompress_update(void)
> > +{
> > +       int i;
> > +       int ret;
> > +
> > +       kcompress = kvmalloc_array(nr_kcompressd, sizeof(struct
> > kcompress), GFP_KERNEL);
> > +       if (!kcompress)
> > +               return -ENOMEM;
> > +
> > +       kcompressd_para = kvmalloc_array(nr_kcompressd,
> > sizeof(struct kcompressd_para), GFP_KERNEL);
> > +       if (!kcompressd_para)
> > +               return -ENOMEM;
> > +
> > +       ret = init_write_queue();
> > +       if (ret) {
> > +               pr_err("Initialization of writing to FIFOs
> > failed!!\n");
> > +               return ret;
> > +       }
> > +
> > +       for (i = 0; i < nr_kcompressd; i++) {
> > +               init_waitqueue_head(&kcompress[i].kcompressd_wait);
> > +               kcompressd_para[i].kcompressd_wait =
> > &kcompress[i].kcompressd_wait;
> > +               kcompressd_para[i].write_fifo =
> > &kcompress[i].write_fifo;
> > +               kcompressd_para[i].running = &kcompress[i].running;
> > +       }
> > +
> > +       return 0;
> > +}
> > +
> > +static void stop_all_kcompressd_thread(void)
> > +{
> > +       int i;
> > +
> > +       for (i = 0; i < nr_kcompressd; i++) {
> > +               kthread_stop(kcompress[i].kcompressd);
> > +               kcompress[i].kcompressd = NULL;
> > +               clean_bio_queue(i);
> > +       }
> > +}
> > +
> > +static int do_nr_kcompressd_handler(const char *val,
> > +               const struct kernel_param *kp)
> > +{
> > +       int ret;
> > +
> > +       atomic_set(&enable_kcompressd, false);
> > +
> > +       stop_all_kcompressd_thread();
> > +
> > +       ret = param_set_int(val, kp);
> > +       if (!ret) {
> > +               pr_err("Invalid number of kcompressd.\n");
> > +               return -EINVAL;
> > +       }
> > +
> > +       ret = init_write_queue();
> > +       if (ret) {
> > +               pr_err("Initialization of writing to FIFOs
> > failed!!\n");
> > +               return ret;
> > +       }
> > +
> > +       atomic_set(&enable_kcompressd, true);
> > +
> > +       return 0;
> > +}
> > +
> > +static const struct kernel_param_ops
> > param_ops_change_nr_kcompressd = {
> > +       .set = &do_nr_kcompressd_handler,
> > +       .get = &param_get_uint,
> > +       .free = NULL,
> > +};
> > +
> > +module_param_cb(nr_kcompressd, &param_ops_change_nr_kcompressd,
> > +               &nr_kcompressd, 0644);
> > +MODULE_PARM_DESC(nr_kcompressd, "Number of pre-created daemon for
> > page compression");
> > +
> > +static int do_queue_size_per_kcompressd_handler(const char *val,
> > +               const struct kernel_param *kp)
> > +{
> > +       int ret;
> > +
> > +       atomic_set(&enable_kcompressd, false);
> > +
> > +       stop_all_kcompressd_thread();
> > +
> > +       ret = param_set_int(val, kp);
> > +       if (!ret) {
> > +               pr_err("Invalid queue size for kcompressd.\n");
> > +               return -EINVAL;
> > +       }
> > +
> > +       ret = init_write_queue();
> > +       if (ret) {
> > +               pr_err("Initialization of writing to FIFOs
> > failed!!\n");
> > +               return ret;
> > +       }
> > +
> > +       pr_info("Queue size for kcompressd was changed: %d\n",
> > queue_size_per_kcompressd);
> > +
> > +       atomic_set(&enable_kcompressd, true);
> > +       return 0;
> > +}
> > +
> > +static const struct kernel_param_ops
> > param_ops_change_queue_size_per_kcompressd = {
> > +       .set = &do_queue_size_per_kcompressd_handler,
> > +       .get = &param_get_uint,
> > +       .free = NULL,
> > +};
> > +
> > +module_param_cb(queue_size_per_kcompressd,
> > &param_ops_change_queue_size_per_kcompressd,
> > +               &queue_size_per_kcompressd, 0644);
> > +MODULE_PARM_DESC(queue_size_per_kcompressd,
> > +               "Size of queue for kcompressd");
> > +
> > +int schedule_bio_write(void *mem, struct bio *bio,
> > compress_callback cb)
> > +{
> > +       int i;
> > +       bool submit_success = false;
> > +       size_t sz_work = sizeof(struct write_work);
> > +
> > +       struct write_work entry = {
> > +               .mem = mem,
> > +               .bio = bio,
> > +               .cb = cb
> > +       };
> > +
> > +       if (unlikely(!atomic_read(&enable_kcompressd)))
> > +               return -EBUSY;
> > +
> > +       if (!nr_kcompressd || !current_is_kswapd())
> > +               return -EBUSY;
> > +
> > +       bio_get(bio);
> > +
> > +       for (i = 0; i < nr_kcompressd; i++) {
> > +               submit_success =
> > +                       (kfifo_avail(&kcompress[i].write_fifo) >=
> > sz_work) &&
> > +                       (sz_work ==
> > kfifo_in(&kcompress[i].write_fifo, &entry, sz_work));
> > +
> > +               if (submit_success) {
> > +                       switch (atomic_read(&kcompress[i].running))
> > {
> > +                       case KCOMPRESSD_NOT_STARTED:
> > +                               atomic_set(&kcompress[i].running,
> > KCOMPRESSD_RUNNING);
> > +                               kcompress[i].kcompressd =
> > kthread_run(kcompressd,
> > +                                              
> > &kcompressd_para[i], "kcompressd:%d", i);
> > +                               if
> > (IS_ERR(kcompress[i].kcompressd)) {
> > +                                      
> > atomic_set(&kcompress[i].running, KCOMPRESSD_NOT_STARTED);
> > +                                       pr_warn("Failed to start
> > kcompressd:%d\n", i);
> > +                                       clean_bio_queue(i);
> > +                               }
> > +                               break;
> > +                       case KCOMPRESSD_RUNNING:
> > +                               break;
> > +                       case KCOMPRESSD_SLEEPING:
> > +                              
> > wake_up_interruptible(&kcompress[i].kcompressd_wait);
> > +                               break;
> > +                       }
> > +                       return 0;
> > +               }
> > +       }
> > +
> > +       bio_put(bio);
> > +       return -EBUSY;
> > +}
> > +EXPORT_SYMBOL(schedule_bio_write);
> > +
> > +static int __init kcompressd_init(void)
> > +{
> > +       int ret;
> > +
> > +       nr_kcompressd = DEFAULT_NR_KCOMPRESSD;
> > +       queue_size_per_kcompressd = INIT_QUEUE_SIZE;
> > +
> > +       ret = kcompress_update();
> > +       if (ret) {
> > +               pr_err("Init kcompressd failed!\n");
> > +               return ret;
> > +       }
> > +
> > +       atomic_set(&enable_kcompressd, true);
> > +       blocking_notifier_call_chain(&kcompressd_notifier_list, 0,
> > NULL);
> > +       return 0;
> > +}
> > +
> > +static void __exit kcompressd_exit(void)
> > +{
> > +       atomic_set(&enable_kcompressd, false);
> > +       stop_all_kcompressd_thread();
> > +
> > +       kvfree(kcompress);
> > +       kvfree(kcompressd_para);
> > +}
> > +
> > +module_init(kcompressd_init);
> > +module_exit(kcompressd_exit);
> > +
> > +MODULE_LICENSE("Dual BSD/GPL");
> > +MODULE_AUTHOR("Qun-Wei Lin <qun-wei.lin@mediatek.com>");
> > +MODULE_DESCRIPTION("Separate the page compression from the memory
> > reclaiming");
> > +
> > diff --git a/drivers/block/zram/kcompressd.h
> > b/drivers/block/zram/kcompressd.h
> > new file mode 100644
> > index 000000000000..2fe0b424a7af
> > --- /dev/null
> > +++ b/drivers/block/zram/kcompressd.h
> > @@ -0,0 +1,25 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +/*
> > + * Copyright (C) 2024 MediaTek Inc.
> > + */
> > +
> > +#ifndef _KCOMPRESSD_H_
> > +#define _KCOMPRESSD_H_
> > +
> > +#include <linux/rwsem.h>
> > +#include <linux/kfifo.h>
> > +#include <linux/atomic.h>
> > +
> > +typedef void (*compress_callback)(void *mem, struct bio *bio);
> > +
> > +struct kcompress {
> > +       struct task_struct *kcompressd;
> > +       wait_queue_head_t kcompressd_wait;
> > +       struct kfifo write_fifo;
> > +       atomic_t running;
> > +};
> > +
> > +int kcompressd_enabled(void);
> > +int schedule_bio_write(void *mem, struct bio *bio,
> > compress_callback cb);
> > +#endif
> > +
> > diff --git a/drivers/block/zram/zram_drv.c
> > b/drivers/block/zram/zram_drv.c
> > index 2e1a70f2f4bd..bcd63ecb6ff2 100644
> > --- a/drivers/block/zram/zram_drv.c
> > +++ b/drivers/block/zram/zram_drv.c
> > @@ -35,6 +35,7 @@
> >  #include <linux/part_stat.h>
> >  #include <linux/kernel_read_file.h>
> > 
> > +#include "kcompressd.h"
> >  #include "zram_drv.h"
> > 
> >  static DEFINE_IDR(zram_index_idr);
> > @@ -2240,6 +2241,15 @@ static void zram_bio_write(struct zram
> > *zram, struct bio *bio)
> >         bio_endio(bio);
> >  }
> > 
> > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > +static void zram_bio_write_callback(void *mem, struct bio *bio)
> > +{
> > +       struct zram *zram = (struct zram *)mem;
> > +
> > +       zram_bio_write(zram, bio);
> > +}
> > +#endif
> > +
> >  /*
> >   * Handler function for all zram I/O requests.
> >   */
> > @@ -2252,6 +2262,10 @@ static void zram_submit_bio(struct bio *bio)
> >                 zram_bio_read(zram, bio);
> >                 break;
> >         case REQ_OP_WRITE:
> > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > +               if (kcompressd_enabled() &&
> > !schedule_bio_write(zram, bio, zram_bio_write_callback))
> > +                       break;
> > +#endif
> >                 zram_bio_write(zram, bio);
> >                 break;
> >         case REQ_OP_DISCARD:
> > @@ -2535,9 +2549,11 @@ static int zram_add(void)
> >  #if ZRAM_LOGICAL_BLOCK_SIZE == PAGE_SIZE
> >                 .max_write_zeroes_sectors       = UINT_MAX,
> >  #endif
> > -               .features                       =
> > BLK_FEAT_STABLE_WRITES        |
> > -                                                
> > BLK_FEAT_READ_SYNCHRONOUS     |
> > -                                                
> > BLK_FEAT_WRITE_SYNCHRONOUS,
> > +               .features                       =
> > BLK_FEAT_STABLE_WRITES
> > +                                                 |
> > BLK_FEAT_READ_SYNCHRONOUS
> > +#if !IS_ENABLED(CONFIG_KCOMPRESSD)
> > +                                                 |
> > BLK_FEAT_WRITE_SYNCHRONOUS,
> > +#endif
> >         };
> >         struct zram *zram;
> >         int ret, device_id;
> > --
> > 2.45.2
> > 
> 
> Thanks
> Barry
Sergey Senozhatsky March 11, 2025, 5:02 a.m. UTC | #6
On (25/03/07 15:13), Nhat Pham wrote:
> > > +config KCOMPRESSD
> > > +       tristate "Kcompressd: Accelerated zram compression"
> > > +       depends on ZRAM
> > > +       help
> > > +         Kcompressd creates multiple daemons to accelerate the compression of pages
> > > +         in zram, offloading this time-consuming task from the zram driver.
> > > +
> > > +         This approach improves system efficiency by handling page compression separately,
> > > +         which was originally done by kswapd or direct reclaim.
> >
> > For direct reclaim, we were previously able to compress using multiple CPUs
> > with multi-threading.
> > After your patch, it seems that only a single thread/CPU is used for compression
> > so it won't necessarily improve direct reclaim performance?
> >
> > Even for kswapd, we used to have multiple threads like [kswapd0], [kswapd1],
> > and [kswapd2] for different nodes. Now, are we also limited to just one thread?
> > I also wonder if this could be handled at the vmscan level instead of the zram
> > level. then it might potentially help other sync devices or even zswap later.
> 
> Agree. A shared solution would be much appreciated. We can keep the
> kcompressd idea, but have it accept IO work from multiple sources
> (zram, zswap, whatever) through a shared API.

I guess it also need to take swapoff into consideration (especially
if it takes I/O from multiple sources)?
Barry Song March 11, 2025, 7:05 a.m. UTC | #7
On Tue, Mar 11, 2025 at 2:26 AM Qun-wei Lin (林群崴)
<Qun-wei.Lin@mediatek.com> wrote:
>
> On Sat, 2025-03-08 at 08:41 +1300, Barry Song wrote:
> >
> > External email : Please do not click links or open attachments until
> > you have verified the sender or the content.
> >
> >
> > On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > wrote:
> > >
> > > Introduced Kcompressd to offload zram page compression, improving
> > > system efficiency by handling compression separately from memory
> > > reclaiming. Added necessary configurations and dependencies.
> > >
> > > Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > > ---
> > >  drivers/block/zram/Kconfig      |  11 ++
> > >  drivers/block/zram/Makefile     |   3 +-
> > >  drivers/block/zram/kcompressd.c | 340
> > > ++++++++++++++++++++++++++++++++
> > >  drivers/block/zram/kcompressd.h |  25 +++
> > >  drivers/block/zram/zram_drv.c   |  22 ++-
> > >  5 files changed, 397 insertions(+), 4 deletions(-)
> > >  create mode 100644 drivers/block/zram/kcompressd.c
> > >  create mode 100644 drivers/block/zram/kcompressd.h
> > >
> > > diff --git a/drivers/block/zram/Kconfig
> > > b/drivers/block/zram/Kconfig
> > > index 402b7b175863..f0a1b574f770 100644
> > > --- a/drivers/block/zram/Kconfig
> > > +++ b/drivers/block/zram/Kconfig
> > > @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
> > >           re-compress pages using a potentially slower but more
> > > effective
> > >           compression algorithm. Note, that IDLE page recompression
> > >           requires ZRAM_TRACK_ENTRY_ACTIME.
> > > +
> > > +config KCOMPRESSD
> > > +       tristate "Kcompressd: Accelerated zram compression"
> > > +       depends on ZRAM
> > > +       help
> > > +         Kcompressd creates multiple daemons to accelerate the
> > > compression of pages
> > > +         in zram, offloading this time-consuming task from the
> > > zram driver.
> > > +
> > > +         This approach improves system efficiency by handling page
> > > compression separately,
> > > +         which was originally done by kswapd or direct reclaim.
> >
> > For direct reclaim, we were previously able to compress using
> > multiple CPUs
> > with multi-threading.
> > After your patch, it seems that only a single thread/CPU is used for
> > compression
> > so it won't necessarily improve direct reclaim performance?
> >
>
> Our patch only splits the context of kswapd. When direct reclaim is
> occurred, it is bypassed, so direct reclaim remains unchanged, with
> each thread that needs memory directly reclaiming it.

Qun-wei, I’m getting a bit confused. Looking at the code in page_io.c,
you always call swap_writepage_bdev_async() no matter if it is kswapd
or direct reclaim:

- else if (data_race(sis->flags & SWP_SYNCHRONOUS_IO))
+ else if (data_race(sis->flags & SWP_WRITE_SYNCHRONOUS_IO))
           swap_writepage_bdev_sync(folio, wbc, sis);
  else
            swap_writepage_bdev_async(folio, wbc, sis);

In zram, I notice you are bypassing kcompressd by:

+ if (!nr_kcompressd || !current_is_kswapd())
+        return -EBUSY;

How will this work if no one is calling __end_swap_bio_write(&bio),
which is present in swap_writepage_bdev_sync()?
Am I missing something? Or is it done by zram_bio_write() ?

On the other hand, zram is a generic block device, and coupling its
code with kswapd/direct reclaim somehow violates layering
principles :-)

>
> > Even for kswapd, we used to have multiple threads like [kswapd0],
> > [kswapd1],
> > and [kswapd2] for different nodes. Now, are we also limited to just
> > one thread?
>
> We only considered a single kswapd here and didn't account for multiple
> instances. Since I use kfifo to collect the bios, if there are multiple
> kswapds, we need to add a lock to protect the bio queue. I can revise
> this in the 2nd version, or do you have any other suggested approaches?

I'm wondering if we can move the code to vmscan/page_io instead
of zram. If we're using a sync I/O swap device or have enabled zswap,
we could run reclamation in this separate thread, which should also be
NUMA-aware.

I would definitely be interested in prototyping it when I have the time.

>
> > I also wonder if this could be handled at the vmscan level instead of
> > the zram
> > level. then it might potentially help other sync devices or even
> > zswap later.
> >
> > But I agree that for phones, modifying zram seems like an easier
> > starting
> > point. However, relying on a single thread isn't always the best
> > approach.
> >
> >
> > > +
> > > diff --git a/drivers/block/zram/Makefile
> > > b/drivers/block/zram/Makefile
> > > index 0fdefd576691..23baa5dfceb9 100644
> > > --- a/drivers/block/zram/Makefile
> > > +++ b/drivers/block/zram/Makefile
> > > @@ -9,4 +9,5 @@ zram-$(CONFIG_ZRAM_BACKEND_ZSTD)        +=
> > > backend_zstd.o
> > >  zram-$(CONFIG_ZRAM_BACKEND_DEFLATE)    += backend_deflate.o
> > >  zram-$(CONFIG_ZRAM_BACKEND_842)                += backend_842.o
> > >
> > > -obj-$(CONFIG_ZRAM)     +=      zram.o
> > > +obj-$(CONFIG_ZRAM)             += zram.o
> > > +obj-$(CONFIG_KCOMPRESSD)       += kcompressd.o
> > > diff --git a/drivers/block/zram/kcompressd.c
> > > b/drivers/block/zram/kcompressd.c
> > > new file mode 100644
> > > index 000000000000..195b7e386869
> > > --- /dev/null
> > > +++ b/drivers/block/zram/kcompressd.c
> > > @@ -0,0 +1,340 @@
> > > +// SPDX-License-Identifier: GPL-2.0
> > > +/*
> > > + * Copyright (C) 2024 MediaTek Inc.
> > > + */
> > > +
> > > +#include <linux/module.h>
> > > +#include <linux/kernel.h>
> > > +#include <linux/bio.h>
> > > +#include <linux/bitops.h>
> > > +#include <linux/freezer.h>
> > > +#include <linux/kernel.h>
> > > +#include <linux/psi.h>
> > > +#include <linux/kfifo.h>
> > > +#include <linux/swap.h>
> > > +#include <linux/delay.h>
> > > +
> > > +#include "kcompressd.h"
> > > +
> > > +#define INIT_QUEUE_SIZE                4096
> > > +#define DEFAULT_NR_KCOMPRESSD  4
> > > +
> > > +static atomic_t enable_kcompressd;
> > > +static unsigned int nr_kcompressd;
> > > +static unsigned int queue_size_per_kcompressd;
> > > +static struct kcompress *kcompress;
> > > +
> > > +enum run_state {
> > > +       KCOMPRESSD_NOT_STARTED = 0,
> > > +       KCOMPRESSD_RUNNING,
> > > +       KCOMPRESSD_SLEEPING,
> > > +};
> > > +
> > > +struct kcompressd_para {
> > > +       wait_queue_head_t *kcompressd_wait;
> > > +       struct kfifo *write_fifo;
> > > +       atomic_t *running;
> > > +};
> > > +
> > > +static struct kcompressd_para *kcompressd_para;
> > > +static BLOCKING_NOTIFIER_HEAD(kcompressd_notifier_list);
> > > +
> > > +struct write_work {
> > > +       void *mem;
> > > +       struct bio *bio;
> > > +       compress_callback cb;
> > > +};
> > > +
> > > +int kcompressd_enabled(void)
> > > +{
> > > +       return likely(atomic_read(&enable_kcompressd));
> > > +}
> > > +EXPORT_SYMBOL(kcompressd_enabled);
> > > +
> > > +static void kcompressd_try_to_sleep(struct kcompressd_para *p)
> > > +{
> > > +       DEFINE_WAIT(wait);
> > > +
> > > +       if (!kfifo_is_empty(p->write_fifo))
> > > +               return;
> > > +
> > > +       if (freezing(current) || kthread_should_stop())
> > > +               return;
> > > +
> > > +       atomic_set(p->running, KCOMPRESSD_SLEEPING);
> > > +       prepare_to_wait(p->kcompressd_wait, &wait,
> > > TASK_INTERRUPTIBLE);
> > > +
> > > +       /*
> > > +        * After a short sleep, check if it was a premature sleep.
> > > If not, then
> > > +        * go fully to sleep until explicitly woken up.
> > > +        */
> > > +       if (!kthread_should_stop() && kfifo_is_empty(p-
> > > >write_fifo))
> > > +               schedule();
> > > +
> > > +       finish_wait(p->kcompressd_wait, &wait);
> > > +       atomic_set(p->running, KCOMPRESSD_RUNNING);
> > > +}
> > > +
> > > +static int kcompressd(void *para)
> > > +{
> > > +       struct task_struct *tsk = current;
> > > +       struct kcompressd_para *p = (struct kcompressd_para *)para;
> > > +
> > > +       tsk->flags |= PF_MEMALLOC | PF_KSWAPD;
> > > +       set_freezable();
> > > +
> > > +       while (!kthread_should_stop()) {
> > > +               bool ret;
> > > +
> > > +               kcompressd_try_to_sleep(p);
> > > +               ret = try_to_freeze();
> > > +               if (kthread_should_stop())
> > > +                       break;
> > > +
> > > +               if (ret)
> > > +                       continue;
> > > +
> > > +               while (!kfifo_is_empty(p->write_fifo)) {
> > > +                       struct write_work entry;
> > > +
> > > +                       if (sizeof(struct write_work) ==
> > > kfifo_out(p->write_fifo,
> > > +                                               &entry,
> > > sizeof(struct write_work))) {
> > > +                               entry.cb(entry.mem, entry.bio);
> > > +                               bio_put(entry.bio);
> > > +                       }
> > > +               }
> > > +
> > > +       }
> > > +
> > > +       tsk->flags &= ~(PF_MEMALLOC | PF_KSWAPD);
> > > +       atomic_set(p->running, KCOMPRESSD_NOT_STARTED);
> > > +       return 0;
> > > +}
> > > +
> > > +static int init_write_queue(void)
> > > +{
> > > +       int i;
> > > +       unsigned int queue_len = queue_size_per_kcompressd *
> > > sizeof(struct write_work);
> > > +
> > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > +               if (kfifo_alloc(&kcompress[i].write_fifo,
> > > +                                       queue_len, GFP_KERNEL)) {
> > > +                       pr_err("Failed to alloc kfifo %d\n", i);
> > > +                       return -ENOMEM;
> > > +               }
> > > +       }
> > > +       return 0;
> > > +}
> > > +
> > > +static void clean_bio_queue(int idx)
> > > +{
> > > +       struct write_work entry;
> > > +
> > > +       while (sizeof(struct write_work) ==
> > > kfifo_out(&kcompress[idx].write_fifo,
> > > +                               &entry, sizeof(struct write_work)))
> > > {
> > > +               bio_put(entry.bio);
> > > +               entry.cb(entry.mem, entry.bio);
> > > +       }
> > > +       kfifo_free(&kcompress[idx].write_fifo);
> > > +}
> > > +
> > > +static int kcompress_update(void)
> > > +{
> > > +       int i;
> > > +       int ret;
> > > +
> > > +       kcompress = kvmalloc_array(nr_kcompressd, sizeof(struct
> > > kcompress), GFP_KERNEL);
> > > +       if (!kcompress)
> > > +               return -ENOMEM;
> > > +
> > > +       kcompressd_para = kvmalloc_array(nr_kcompressd,
> > > sizeof(struct kcompressd_para), GFP_KERNEL);
> > > +       if (!kcompressd_para)
> > > +               return -ENOMEM;
> > > +
> > > +       ret = init_write_queue();
> > > +       if (ret) {
> > > +               pr_err("Initialization of writing to FIFOs
> > > failed!!\n");
> > > +               return ret;
> > > +       }
> > > +
> > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > +               init_waitqueue_head(&kcompress[i].kcompressd_wait);
> > > +               kcompressd_para[i].kcompressd_wait =
> > > &kcompress[i].kcompressd_wait;
> > > +               kcompressd_para[i].write_fifo =
> > > &kcompress[i].write_fifo;
> > > +               kcompressd_para[i].running = &kcompress[i].running;
> > > +       }
> > > +
> > > +       return 0;
> > > +}
> > > +
> > > +static void stop_all_kcompressd_thread(void)
> > > +{
> > > +       int i;
> > > +
> > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > +               kthread_stop(kcompress[i].kcompressd);
> > > +               kcompress[i].kcompressd = NULL;
> > > +               clean_bio_queue(i);
> > > +       }
> > > +}
> > > +
> > > +static int do_nr_kcompressd_handler(const char *val,
> > > +               const struct kernel_param *kp)
> > > +{
> > > +       int ret;
> > > +
> > > +       atomic_set(&enable_kcompressd, false);
> > > +
> > > +       stop_all_kcompressd_thread();
> > > +
> > > +       ret = param_set_int(val, kp);
> > > +       if (!ret) {
> > > +               pr_err("Invalid number of kcompressd.\n");
> > > +               return -EINVAL;
> > > +       }
> > > +
> > > +       ret = init_write_queue();
> > > +       if (ret) {
> > > +               pr_err("Initialization of writing to FIFOs
> > > failed!!\n");
> > > +               return ret;
> > > +       }
> > > +
> > > +       atomic_set(&enable_kcompressd, true);
> > > +
> > > +       return 0;
> > > +}
> > > +
> > > +static const struct kernel_param_ops
> > > param_ops_change_nr_kcompressd = {
> > > +       .set = &do_nr_kcompressd_handler,
> > > +       .get = &param_get_uint,
> > > +       .free = NULL,
> > > +};
> > > +
> > > +module_param_cb(nr_kcompressd, &param_ops_change_nr_kcompressd,
> > > +               &nr_kcompressd, 0644);
> > > +MODULE_PARM_DESC(nr_kcompressd, "Number of pre-created daemon for
> > > page compression");
> > > +
> > > +static int do_queue_size_per_kcompressd_handler(const char *val,
> > > +               const struct kernel_param *kp)
> > > +{
> > > +       int ret;
> > > +
> > > +       atomic_set(&enable_kcompressd, false);
> > > +
> > > +       stop_all_kcompressd_thread();
> > > +
> > > +       ret = param_set_int(val, kp);
> > > +       if (!ret) {
> > > +               pr_err("Invalid queue size for kcompressd.\n");
> > > +               return -EINVAL;
> > > +       }
> > > +
> > > +       ret = init_write_queue();
> > > +       if (ret) {
> > > +               pr_err("Initialization of writing to FIFOs
> > > failed!!\n");
> > > +               return ret;
> > > +       }
> > > +
> > > +       pr_info("Queue size for kcompressd was changed: %d\n",
> > > queue_size_per_kcompressd);
> > > +
> > > +       atomic_set(&enable_kcompressd, true);
> > > +       return 0;
> > > +}
> > > +
> > > +static const struct kernel_param_ops
> > > param_ops_change_queue_size_per_kcompressd = {
> > > +       .set = &do_queue_size_per_kcompressd_handler,
> > > +       .get = &param_get_uint,
> > > +       .free = NULL,
> > > +};
> > > +
> > > +module_param_cb(queue_size_per_kcompressd,
> > > &param_ops_change_queue_size_per_kcompressd,
> > > +               &queue_size_per_kcompressd, 0644);
> > > +MODULE_PARM_DESC(queue_size_per_kcompressd,
> > > +               "Size of queue for kcompressd");
> > > +
> > > +int schedule_bio_write(void *mem, struct bio *bio,
> > > compress_callback cb)
> > > +{
> > > +       int i;
> > > +       bool submit_success = false;
> > > +       size_t sz_work = sizeof(struct write_work);
> > > +
> > > +       struct write_work entry = {
> > > +               .mem = mem,
> > > +               .bio = bio,
> > > +               .cb = cb
> > > +       };
> > > +
> > > +       if (unlikely(!atomic_read(&enable_kcompressd)))
> > > +               return -EBUSY;
> > > +
> > > +       if (!nr_kcompressd || !current_is_kswapd())
> > > +               return -EBUSY;
> > > +
> > > +       bio_get(bio);
> > > +
> > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > +               submit_success =
> > > +                       (kfifo_avail(&kcompress[i].write_fifo) >=
> > > sz_work) &&
> > > +                       (sz_work ==
> > > kfifo_in(&kcompress[i].write_fifo, &entry, sz_work));
> > > +
> > > +               if (submit_success) {
> > > +                       switch (atomic_read(&kcompress[i].running))
> > > {
> > > +                       case KCOMPRESSD_NOT_STARTED:
> > > +                               atomic_set(&kcompress[i].running,
> > > KCOMPRESSD_RUNNING);
> > > +                               kcompress[i].kcompressd =
> > > kthread_run(kcompressd,
> > > +
> > > &kcompressd_para[i], "kcompressd:%d", i);
> > > +                               if
> > > (IS_ERR(kcompress[i].kcompressd)) {
> > > +
> > > atomic_set(&kcompress[i].running, KCOMPRESSD_NOT_STARTED);
> > > +                                       pr_warn("Failed to start
> > > kcompressd:%d\n", i);
> > > +                                       clean_bio_queue(i);
> > > +                               }
> > > +                               break;
> > > +                       case KCOMPRESSD_RUNNING:
> > > +                               break;
> > > +                       case KCOMPRESSD_SLEEPING:
> > > +
> > > wake_up_interruptible(&kcompress[i].kcompressd_wait);
> > > +                               break;
> > > +                       }
> > > +                       return 0;
> > > +               }
> > > +       }
> > > +
> > > +       bio_put(bio);
> > > +       return -EBUSY;
> > > +}
> > > +EXPORT_SYMBOL(schedule_bio_write);
> > > +
> > > +static int __init kcompressd_init(void)
> > > +{
> > > +       int ret;
> > > +
> > > +       nr_kcompressd = DEFAULT_NR_KCOMPRESSD;
> > > +       queue_size_per_kcompressd = INIT_QUEUE_SIZE;
> > > +
> > > +       ret = kcompress_update();
> > > +       if (ret) {
> > > +               pr_err("Init kcompressd failed!\n");
> > > +               return ret;
> > > +       }
> > > +
> > > +       atomic_set(&enable_kcompressd, true);
> > > +       blocking_notifier_call_chain(&kcompressd_notifier_list, 0,
> > > NULL);
> > > +       return 0;
> > > +}
> > > +
> > > +static void __exit kcompressd_exit(void)
> > > +{
> > > +       atomic_set(&enable_kcompressd, false);
> > > +       stop_all_kcompressd_thread();
> > > +
> > > +       kvfree(kcompress);
> > > +       kvfree(kcompressd_para);
> > > +}
> > > +
> > > +module_init(kcompressd_init);
> > > +module_exit(kcompressd_exit);
> > > +
> > > +MODULE_LICENSE("Dual BSD/GPL");
> > > +MODULE_AUTHOR("Qun-Wei Lin <qun-wei.lin@mediatek.com>");
> > > +MODULE_DESCRIPTION("Separate the page compression from the memory
> > > reclaiming");
> > > +
> > > diff --git a/drivers/block/zram/kcompressd.h
> > > b/drivers/block/zram/kcompressd.h
> > > new file mode 100644
> > > index 000000000000..2fe0b424a7af
> > > --- /dev/null
> > > +++ b/drivers/block/zram/kcompressd.h
> > > @@ -0,0 +1,25 @@
> > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > +/*
> > > + * Copyright (C) 2024 MediaTek Inc.
> > > + */
> > > +
> > > +#ifndef _KCOMPRESSD_H_
> > > +#define _KCOMPRESSD_H_
> > > +
> > > +#include <linux/rwsem.h>
> > > +#include <linux/kfifo.h>
> > > +#include <linux/atomic.h>
> > > +
> > > +typedef void (*compress_callback)(void *mem, struct bio *bio);
> > > +
> > > +struct kcompress {
> > > +       struct task_struct *kcompressd;
> > > +       wait_queue_head_t kcompressd_wait;
> > > +       struct kfifo write_fifo;
> > > +       atomic_t running;
> > > +};
> > > +
> > > +int kcompressd_enabled(void);
> > > +int schedule_bio_write(void *mem, struct bio *bio,
> > > compress_callback cb);
> > > +#endif
> > > +
> > > diff --git a/drivers/block/zram/zram_drv.c
> > > b/drivers/block/zram/zram_drv.c
> > > index 2e1a70f2f4bd..bcd63ecb6ff2 100644
> > > --- a/drivers/block/zram/zram_drv.c
> > > +++ b/drivers/block/zram/zram_drv.c
> > > @@ -35,6 +35,7 @@
> > >  #include <linux/part_stat.h>
> > >  #include <linux/kernel_read_file.h>
> > >
> > > +#include "kcompressd.h"
> > >  #include "zram_drv.h"
> > >
> > >  static DEFINE_IDR(zram_index_idr);
> > > @@ -2240,6 +2241,15 @@ static void zram_bio_write(struct zram
> > > *zram, struct bio *bio)
> > >         bio_endio(bio);
> > >  }
> > >
> > > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > > +static void zram_bio_write_callback(void *mem, struct bio *bio)
> > > +{
> > > +       struct zram *zram = (struct zram *)mem;
> > > +
> > > +       zram_bio_write(zram, bio);
> > > +}
> > > +#endif
> > > +
> > >  /*
> > >   * Handler function for all zram I/O requests.
> > >   */
> > > @@ -2252,6 +2262,10 @@ static void zram_submit_bio(struct bio *bio)
> > >                 zram_bio_read(zram, bio);
> > >                 break;
> > >         case REQ_OP_WRITE:
> > > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > > +               if (kcompressd_enabled() &&
> > > !schedule_bio_write(zram, bio, zram_bio_write_callback))
> > > +                       break;
> > > +#endif
> > >                 zram_bio_write(zram, bio);
> > >                 break;
> > >         case REQ_OP_DISCARD:
> > > @@ -2535,9 +2549,11 @@ static int zram_add(void)
> > >  #if ZRAM_LOGICAL_BLOCK_SIZE == PAGE_SIZE
> > >                 .max_write_zeroes_sectors       = UINT_MAX,
> > >  #endif
> > > -               .features                       =
> > > BLK_FEAT_STABLE_WRITES        |
> > > -
> > > BLK_FEAT_READ_SYNCHRONOUS     |
> > > -
> > > BLK_FEAT_WRITE_SYNCHRONOUS,
> > > +               .features                       =
> > > BLK_FEAT_STABLE_WRITES
> > > +                                                 |
> > > BLK_FEAT_READ_SYNCHRONOUS
> > > +#if !IS_ENABLED(CONFIG_KCOMPRESSD)
> > > +                                                 |
> > > BLK_FEAT_WRITE_SYNCHRONOUS,
> > > +#endif
> > >         };
> > >         struct zram *zram;
> > >         int ret, device_id;
> > > --
> > > 2.45.2
> > >
> >

Thanks
Barry
Barry Song March 11, 2025, 7:25 a.m. UTC | #8
On Tue, Mar 11, 2025 at 8:05 PM Barry Song <21cnbao@gmail.com> wrote:
>
> On Tue, Mar 11, 2025 at 2:26 AM Qun-wei Lin (林群崴)
> <Qun-wei.Lin@mediatek.com> wrote:
> >
> > On Sat, 2025-03-08 at 08:41 +1300, Barry Song wrote:
> > >
> > > External email : Please do not click links or open attachments until
> > > you have verified the sender or the content.
> > >
> > >
> > > On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > > wrote:
> > > >
> > > > Introduced Kcompressd to offload zram page compression, improving
> > > > system efficiency by handling compression separately from memory
> > > > reclaiming. Added necessary configurations and dependencies.
> > > >
> > > > Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > > > ---
> > > >  drivers/block/zram/Kconfig      |  11 ++
> > > >  drivers/block/zram/Makefile     |   3 +-
> > > >  drivers/block/zram/kcompressd.c | 340
> > > > ++++++++++++++++++++++++++++++++
> > > >  drivers/block/zram/kcompressd.h |  25 +++
> > > >  drivers/block/zram/zram_drv.c   |  22 ++-
> > > >  5 files changed, 397 insertions(+), 4 deletions(-)
> > > >  create mode 100644 drivers/block/zram/kcompressd.c
> > > >  create mode 100644 drivers/block/zram/kcompressd.h
> > > >
> > > > diff --git a/drivers/block/zram/Kconfig
> > > > b/drivers/block/zram/Kconfig
> > > > index 402b7b175863..f0a1b574f770 100644
> > > > --- a/drivers/block/zram/Kconfig
> > > > +++ b/drivers/block/zram/Kconfig
> > > > @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
> > > >           re-compress pages using a potentially slower but more
> > > > effective
> > > >           compression algorithm. Note, that IDLE page recompression
> > > >           requires ZRAM_TRACK_ENTRY_ACTIME.
> > > > +
> > > > +config KCOMPRESSD
> > > > +       tristate "Kcompressd: Accelerated zram compression"
> > > > +       depends on ZRAM
> > > > +       help
> > > > +         Kcompressd creates multiple daemons to accelerate the
> > > > compression of pages
> > > > +         in zram, offloading this time-consuming task from the
> > > > zram driver.
> > > > +
> > > > +         This approach improves system efficiency by handling page
> > > > compression separately,
> > > > +         which was originally done by kswapd or direct reclaim.
> > >
> > > For direct reclaim, we were previously able to compress using
> > > multiple CPUs
> > > with multi-threading.
> > > After your patch, it seems that only a single thread/CPU is used for
> > > compression
> > > so it won't necessarily improve direct reclaim performance?
> > >
> >
> > Our patch only splits the context of kswapd. When direct reclaim is
> > occurred, it is bypassed, so direct reclaim remains unchanged, with
> > each thread that needs memory directly reclaiming it.
>
> Qun-wei, I’m getting a bit confused. Looking at the code in page_io.c,
> you always call swap_writepage_bdev_async() no matter if it is kswapd
> or direct reclaim:
>
> - else if (data_race(sis->flags & SWP_SYNCHRONOUS_IO))
> + else if (data_race(sis->flags & SWP_WRITE_SYNCHRONOUS_IO))
>            swap_writepage_bdev_sync(folio, wbc, sis);
>   else
>             swap_writepage_bdev_async(folio, wbc, sis);
>
> In zram, I notice you are bypassing kcompressd by:
>
> + if (!nr_kcompressd || !current_is_kswapd())
> +        return -EBUSY;
>
> How will this work if no one is calling __end_swap_bio_write(&bio),
> which is present in swap_writepage_bdev_sync()?
> Am I missing something? Or is it done by zram_bio_write() ?
>
> On the other hand, zram is a generic block device, and coupling its
> code with kswapd/direct reclaim somehow violates layering
> principles :-)
>
> >
> > > Even for kswapd, we used to have multiple threads like [kswapd0],
> > > [kswapd1],
> > > and [kswapd2] for different nodes. Now, are we also limited to just
> > > one thread?
> >
> > We only considered a single kswapd here and didn't account for multiple
> > instances. Since I use kfifo to collect the bios, if there are multiple
> > kswapds, we need to add a lock to protect the bio queue. I can revise
> > this in the 2nd version, or do you have any other suggested approaches?
>
> I'm wondering if we can move the code to vmscan/page_io instead
> of zram. If we're using a sync I/O swap device or have enabled zswap,
> we could run reclamation in this separate thread, which should also be

Sorry for the typo:
s/reclamation/__swap_writepage/g

> NUMA-aware.
>
> I would definitely be interested in prototyping it when I have the time.
>

Thanks
Barry
Qun-wei Lin (林群崴) March 11, 2025, 2:33 p.m. UTC | #9
On Tue, 2025-03-11 at 20:05 +1300, Barry Song wrote:
> 
> External email : Please do not click links or open attachments until
> you have verified the sender or the content.
> 
> 
> On Tue, Mar 11, 2025 at 2:26 AM Qun-wei Lin (林群崴)
> <Qun-wei.Lin@mediatek.com> wrote:
> > 
> > On Sat, 2025-03-08 at 08:41 +1300, Barry Song wrote:
> > > 
> > > External email : Please do not click links or open attachments
> > > until
> > > you have verified the sender or the content.
> > > 
> > > 
> > > On Sat, Mar 8, 2025 at 1:02 AM Qun-Wei Lin
> > > <qun-wei.lin@mediatek.com>
> > > wrote:
> > > > 
> > > > Introduced Kcompressd to offload zram page compression,
> > > > improving
> > > > system efficiency by handling compression separately from
> > > > memory
> > > > reclaiming. Added necessary configurations and dependencies.
> > > > 
> > > > Signed-off-by: Qun-Wei Lin <qun-wei.lin@mediatek.com>
> > > > ---
> > > >  drivers/block/zram/Kconfig      |  11 ++
> > > >  drivers/block/zram/Makefile     |   3 +-
> > > >  drivers/block/zram/kcompressd.c | 340
> > > > ++++++++++++++++++++++++++++++++
> > > >  drivers/block/zram/kcompressd.h |  25 +++
> > > >  drivers/block/zram/zram_drv.c   |  22 ++-
> > > >  5 files changed, 397 insertions(+), 4 deletions(-)
> > > >  create mode 100644 drivers/block/zram/kcompressd.c
> > > >  create mode 100644 drivers/block/zram/kcompressd.h
> > > > 
> > > > diff --git a/drivers/block/zram/Kconfig
> > > > b/drivers/block/zram/Kconfig
> > > > index 402b7b175863..f0a1b574f770 100644
> > > > --- a/drivers/block/zram/Kconfig
> > > > +++ b/drivers/block/zram/Kconfig
> > > > @@ -145,3 +145,14 @@ config ZRAM_MULTI_COMP
> > > >           re-compress pages using a potentially slower but more
> > > > effective
> > > >           compression algorithm. Note, that IDLE page
> > > > recompression
> > > >           requires ZRAM_TRACK_ENTRY_ACTIME.
> > > > +
> > > > +config KCOMPRESSD
> > > > +       tristate "Kcompressd: Accelerated zram compression"
> > > > +       depends on ZRAM
> > > > +       help
> > > > +         Kcompressd creates multiple daemons to accelerate the
> > > > compression of pages
> > > > +         in zram, offloading this time-consuming task from the
> > > > zram driver.
> > > > +
> > > > +         This approach improves system efficiency by handling
> > > > page
> > > > compression separately,
> > > > +         which was originally done by kswapd or direct
> > > > reclaim.
> > > 
> > > For direct reclaim, we were previously able to compress using
> > > multiple CPUs
> > > with multi-threading.
> > > After your patch, it seems that only a single thread/CPU is used
> > > for
> > > compression
> > > so it won't necessarily improve direct reclaim performance?
> > > 
> > 
> > Our patch only splits the context of kswapd. When direct reclaim is
> > occurred, it is bypassed, so direct reclaim remains unchanged, with
> > each thread that needs memory directly reclaiming it.
> 
> Qun-wei, I’m getting a bit confused. Looking at the code in
> page_io.c,
> you always call swap_writepage_bdev_async() no matter if it is kswapd
> or direct reclaim:
> 
> - else if (data_race(sis->flags & SWP_SYNCHRONOUS_IO))
> + else if (data_race(sis->flags & SWP_WRITE_SYNCHRONOUS_IO))
>            swap_writepage_bdev_sync(folio, wbc, sis);
>   else
>             swap_writepage_bdev_async(folio, wbc, sis);
> 
> In zram, I notice you are bypassing kcompressd by:
> 
> + if (!nr_kcompressd || !current_is_kswapd())
> +        return -EBUSY;
> 
> How will this work if no one is calling __end_swap_bio_write(&bio),
> which is present in swap_writepage_bdev_sync()?
> Am I missing something? Or is it done by zram_bio_write() ?
> 

In swap_writepage_bdev_async(), 'bio->bi_end_io = end_swap_bio_write'
is set. So yes, the bio_endio(bio) in zram_bio_write() will handle
this.

> On the other hand, zram is a generic block device, and coupling its
> code with kswapd/direct reclaim somehow violates layering
> principles :-)

OK, so it seems that moving kcompressd to the vmscan level might
resolve this issue, right?


> > 
> > > Even for kswapd, we used to have multiple threads like [kswapd0],
> > > [kswapd1],
> > > and [kswapd2] for different nodes. Now, are we also limited to
> > > just
> > > one thread?
> > 
> > We only considered a single kswapd here and didn't account for
> > multiple
> > instances. Since I use kfifo to collect the bios, if there are
> > multiple
> > kswapds, we need to add a lock to protect the bio queue. I can
> > revise
> > this in the 2nd version, or do you have any other suggested
> > approaches?
> 
> I'm wondering if we can move the code to vmscan/page_io instead
> of zram. If we're using a sync I/O swap device or have enabled zswap,
> we could run reclamation in this separate thread, which should also
> be
> NUMA-aware.
> 
> I would definitely be interested in prototyping it when I have the
> time.

Thank you so much!

> 
> > 
> > > I also wonder if this could be handled at the vmscan level
> > > instead of
> > > the zram
> > > level. then it might potentially help other sync devices or even
> > > zswap later.
> > > 
> > > But I agree that for phones, modifying zram seems like an easier
> > > starting
> > > point. However, relying on a single thread isn't always the best
> > > approach.
> > > 
> > > 
> > > > +
> > > > diff --git a/drivers/block/zram/Makefile
> > > > b/drivers/block/zram/Makefile
> > > > index 0fdefd576691..23baa5dfceb9 100644
> > > > --- a/drivers/block/zram/Makefile
> > > > +++ b/drivers/block/zram/Makefile
> > > > @@ -9,4 +9,5 @@ zram-$(CONFIG_ZRAM_BACKEND_ZSTD)        +=
> > > > backend_zstd.o
> > > >  zram-$(CONFIG_ZRAM_BACKEND_DEFLATE)    += backend_deflate.o
> > > >  zram-$(CONFIG_ZRAM_BACKEND_842)                +=
> > > > backend_842.o
> > > > 
> > > > -obj-$(CONFIG_ZRAM)     +=      zram.o
> > > > +obj-$(CONFIG_ZRAM)             += zram.o
> > > > +obj-$(CONFIG_KCOMPRESSD)       += kcompressd.o
> > > > diff --git a/drivers/block/zram/kcompressd.c
> > > > b/drivers/block/zram/kcompressd.c
> > > > new file mode 100644
> > > > index 000000000000..195b7e386869
> > > > --- /dev/null
> > > > +++ b/drivers/block/zram/kcompressd.c
> > > > @@ -0,0 +1,340 @@
> > > > +// SPDX-License-Identifier: GPL-2.0
> > > > +/*
> > > > + * Copyright (C) 2024 MediaTek Inc.
> > > > + */
> > > > +
> > > > +#include <linux/module.h>
> > > > +#include <linux/kernel.h>
> > > > +#include <linux/bio.h>
> > > > +#include <linux/bitops.h>
> > > > +#include <linux/freezer.h>
> > > > +#include <linux/kernel.h>
> > > > +#include <linux/psi.h>
> > > > +#include <linux/kfifo.h>
> > > > +#include <linux/swap.h>
> > > > +#include <linux/delay.h>
> > > > +
> > > > +#include "kcompressd.h"
> > > > +
> > > > +#define INIT_QUEUE_SIZE                4096
> > > > +#define DEFAULT_NR_KCOMPRESSD  4
> > > > +
> > > > +static atomic_t enable_kcompressd;
> > > > +static unsigned int nr_kcompressd;
> > > > +static unsigned int queue_size_per_kcompressd;
> > > > +static struct kcompress *kcompress;
> > > > +
> > > > +enum run_state {
> > > > +       KCOMPRESSD_NOT_STARTED = 0,
> > > > +       KCOMPRESSD_RUNNING,
> > > > +       KCOMPRESSD_SLEEPING,
> > > > +};
> > > > +
> > > > +struct kcompressd_para {
> > > > +       wait_queue_head_t *kcompressd_wait;
> > > > +       struct kfifo *write_fifo;
> > > > +       atomic_t *running;
> > > > +};
> > > > +
> > > > +static struct kcompressd_para *kcompressd_para;
> > > > +static BLOCKING_NOTIFIER_HEAD(kcompressd_notifier_list);
> > > > +
> > > > +struct write_work {
> > > > +       void *mem;
> > > > +       struct bio *bio;
> > > > +       compress_callback cb;
> > > > +};
> > > > +
> > > > +int kcompressd_enabled(void)
> > > > +{
> > > > +       return likely(atomic_read(&enable_kcompressd));
> > > > +}
> > > > +EXPORT_SYMBOL(kcompressd_enabled);
> > > > +
> > > > +static void kcompressd_try_to_sleep(struct kcompressd_para *p)
> > > > +{
> > > > +       DEFINE_WAIT(wait);
> > > > +
> > > > +       if (!kfifo_is_empty(p->write_fifo))
> > > > +               return;
> > > > +
> > > > +       if (freezing(current) || kthread_should_stop())
> > > > +               return;
> > > > +
> > > > +       atomic_set(p->running, KCOMPRESSD_SLEEPING);
> > > > +       prepare_to_wait(p->kcompressd_wait, &wait,
> > > > TASK_INTERRUPTIBLE);
> > > > +
> > > > +       /*
> > > > +        * After a short sleep, check if it was a premature
> > > > sleep.
> > > > If not, then
> > > > +        * go fully to sleep until explicitly woken up.
> > > > +        */
> > > > +       if (!kthread_should_stop() && kfifo_is_empty(p-
> > > > > write_fifo))
> > > > +               schedule();
> > > > +
> > > > +       finish_wait(p->kcompressd_wait, &wait);
> > > > +       atomic_set(p->running, KCOMPRESSD_RUNNING);
> > > > +}
> > > > +
> > > > +static int kcompressd(void *para)
> > > > +{
> > > > +       struct task_struct *tsk = current;
> > > > +       struct kcompressd_para *p = (struct kcompressd_para
> > > > *)para;
> > > > +
> > > > +       tsk->flags |= PF_MEMALLOC | PF_KSWAPD;
> > > > +       set_freezable();
> > > > +
> > > > +       while (!kthread_should_stop()) {
> > > > +               bool ret;
> > > > +
> > > > +               kcompressd_try_to_sleep(p);
> > > > +               ret = try_to_freeze();
> > > > +               if (kthread_should_stop())
> > > > +                       break;
> > > > +
> > > > +               if (ret)
> > > > +                       continue;
> > > > +
> > > > +               while (!kfifo_is_empty(p->write_fifo)) {
> > > > +                       struct write_work entry;
> > > > +
> > > > +                       if (sizeof(struct write_work) ==
> > > > kfifo_out(p->write_fifo,
> > > > +                                               &entry,
> > > > sizeof(struct write_work))) {
> > > > +                               entry.cb(entry.mem, entry.bio);
> > > > +                               bio_put(entry.bio);
> > > > +                       }
> > > > +               }
> > > > +
> > > > +       }
> > > > +
> > > > +       tsk->flags &= ~(PF_MEMALLOC | PF_KSWAPD);
> > > > +       atomic_set(p->running, KCOMPRESSD_NOT_STARTED);
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static int init_write_queue(void)
> > > > +{
> > > > +       int i;
> > > > +       unsigned int queue_len = queue_size_per_kcompressd *
> > > > sizeof(struct write_work);
> > > > +
> > > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > > +               if (kfifo_alloc(&kcompress[i].write_fifo,
> > > > +                                       queue_len, GFP_KERNEL))
> > > > {
> > > > +                       pr_err("Failed to alloc kfifo %d\n",
> > > > i);
> > > > +                       return -ENOMEM;
> > > > +               }
> > > > +       }
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static void clean_bio_queue(int idx)
> > > > +{
> > > > +       struct write_work entry;
> > > > +
> > > > +       while (sizeof(struct write_work) ==
> > > > kfifo_out(&kcompress[idx].write_fifo,
> > > > +                               &entry, sizeof(struct
> > > > write_work)))
> > > > {
> > > > +               bio_put(entry.bio);
> > > > +               entry.cb(entry.mem, entry.bio);
> > > > +       }
> > > > +       kfifo_free(&kcompress[idx].write_fifo);
> > > > +}
> > > > +
> > > > +static int kcompress_update(void)
> > > > +{
> > > > +       int i;
> > > > +       int ret;
> > > > +
> > > > +       kcompress = kvmalloc_array(nr_kcompressd, sizeof(struct
> > > > kcompress), GFP_KERNEL);
> > > > +       if (!kcompress)
> > > > +               return -ENOMEM;
> > > > +
> > > > +       kcompressd_para = kvmalloc_array(nr_kcompressd,
> > > > sizeof(struct kcompressd_para), GFP_KERNEL);
> > > > +       if (!kcompressd_para)
> > > > +               return -ENOMEM;
> > > > +
> > > > +       ret = init_write_queue();
> > > > +       if (ret) {
> > > > +               pr_err("Initialization of writing to FIFOs
> > > > failed!!\n");
> > > > +               return ret;
> > > > +       }
> > > > +
> > > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > > +              
> > > > init_waitqueue_head(&kcompress[i].kcompressd_wait);
> > > > +               kcompressd_para[i].kcompressd_wait =
> > > > &kcompress[i].kcompressd_wait;
> > > > +               kcompressd_para[i].write_fifo =
> > > > &kcompress[i].write_fifo;
> > > > +               kcompressd_para[i].running =
> > > > &kcompress[i].running;
> > > > +       }
> > > > +
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static void stop_all_kcompressd_thread(void)
> > > > +{
> > > > +       int i;
> > > > +
> > > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > > +               kthread_stop(kcompress[i].kcompressd);
> > > > +               kcompress[i].kcompressd = NULL;
> > > > +               clean_bio_queue(i);
> > > > +       }
> > > > +}
> > > > +
> > > > +static int do_nr_kcompressd_handler(const char *val,
> > > > +               const struct kernel_param *kp)
> > > > +{
> > > > +       int ret;
> > > > +
> > > > +       atomic_set(&enable_kcompressd, false);
> > > > +
> > > > +       stop_all_kcompressd_thread();
> > > > +
> > > > +       ret = param_set_int(val, kp);
> > > > +       if (!ret) {
> > > > +               pr_err("Invalid number of kcompressd.\n");
> > > > +               return -EINVAL;
> > > > +       }
> > > > +
> > > > +       ret = init_write_queue();
> > > > +       if (ret) {
> > > > +               pr_err("Initialization of writing to FIFOs
> > > > failed!!\n");
> > > > +               return ret;
> > > > +       }
> > > > +
> > > > +       atomic_set(&enable_kcompressd, true);
> > > > +
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static const struct kernel_param_ops
> > > > param_ops_change_nr_kcompressd = {
> > > > +       .set = &do_nr_kcompressd_handler,
> > > > +       .get = &param_get_uint,
> > > > +       .free = NULL,
> > > > +};
> > > > +
> > > > +module_param_cb(nr_kcompressd,
> > > > &param_ops_change_nr_kcompressd,
> > > > +               &nr_kcompressd, 0644);
> > > > +MODULE_PARM_DESC(nr_kcompressd, "Number of pre-created daemon
> > > > for
> > > > page compression");
> > > > +
> > > > +static int do_queue_size_per_kcompressd_handler(const char
> > > > *val,
> > > > +               const struct kernel_param *kp)
> > > > +{
> > > > +       int ret;
> > > > +
> > > > +       atomic_set(&enable_kcompressd, false);
> > > > +
> > > > +       stop_all_kcompressd_thread();
> > > > +
> > > > +       ret = param_set_int(val, kp);
> > > > +       if (!ret) {
> > > > +               pr_err("Invalid queue size for kcompressd.\n");
> > > > +               return -EINVAL;
> > > > +       }
> > > > +
> > > > +       ret = init_write_queue();
> > > > +       if (ret) {
> > > > +               pr_err("Initialization of writing to FIFOs
> > > > failed!!\n");
> > > > +               return ret;
> > > > +       }
> > > > +
> > > > +       pr_info("Queue size for kcompressd was changed: %d\n",
> > > > queue_size_per_kcompressd);
> > > > +
> > > > +       atomic_set(&enable_kcompressd, true);
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static const struct kernel_param_ops
> > > > param_ops_change_queue_size_per_kcompressd = {
> > > > +       .set = &do_queue_size_per_kcompressd_handler,
> > > > +       .get = &param_get_uint,
> > > > +       .free = NULL,
> > > > +};
> > > > +
> > > > +module_param_cb(queue_size_per_kcompressd,
> > > > &param_ops_change_queue_size_per_kcompressd,
> > > > +               &queue_size_per_kcompressd, 0644);
> > > > +MODULE_PARM_DESC(queue_size_per_kcompressd,
> > > > +               "Size of queue for kcompressd");
> > > > +
> > > > +int schedule_bio_write(void *mem, struct bio *bio,
> > > > compress_callback cb)
> > > > +{
> > > > +       int i;
> > > > +       bool submit_success = false;
> > > > +       size_t sz_work = sizeof(struct write_work);
> > > > +
> > > > +       struct write_work entry = {
> > > > +               .mem = mem,
> > > > +               .bio = bio,
> > > > +               .cb = cb
> > > > +       };
> > > > +
> > > > +       if (unlikely(!atomic_read(&enable_kcompressd)))
> > > > +               return -EBUSY;
> > > > +
> > > > +       if (!nr_kcompressd || !current_is_kswapd())
> > > > +               return -EBUSY;
> > > > +
> > > > +       bio_get(bio);
> > > > +
> > > > +       for (i = 0; i < nr_kcompressd; i++) {
> > > > +               submit_success =
> > > > +                       (kfifo_avail(&kcompress[i].write_fifo)
> > > > >=
> > > > sz_work) &&
> > > > +                       (sz_work ==
> > > > kfifo_in(&kcompress[i].write_fifo, &entry, sz_work));
> > > > +
> > > > +               if (submit_success) {
> > > > +                       switch
> > > > (atomic_read(&kcompress[i].running))
> > > > {
> > > > +                       case KCOMPRESSD_NOT_STARTED:
> > > > +                              
> > > > atomic_set(&kcompress[i].running,
> > > > KCOMPRESSD_RUNNING);
> > > > +                               kcompress[i].kcompressd =
> > > > kthread_run(kcompressd,
> > > > +
> > > > &kcompressd_para[i], "kcompressd:%d", i);
> > > > +                               if
> > > > (IS_ERR(kcompress[i].kcompressd)) {
> > > > +
> > > > atomic_set(&kcompress[i].running, KCOMPRESSD_NOT_STARTED);
> > > > +                                       pr_warn("Failed to
> > > > start
> > > > kcompressd:%d\n", i);
> > > > +                                       clean_bio_queue(i);
> > > > +                               }
> > > > +                               break;
> > > > +                       case KCOMPRESSD_RUNNING:
> > > > +                               break;
> > > > +                       case KCOMPRESSD_SLEEPING:
> > > > +
> > > > wake_up_interruptible(&kcompress[i].kcompressd_wait);
> > > > +                               break;
> > > > +                       }
> > > > +                       return 0;
> > > > +               }
> > > > +       }
> > > > +
> > > > +       bio_put(bio);
> > > > +       return -EBUSY;
> > > > +}
> > > > +EXPORT_SYMBOL(schedule_bio_write);
> > > > +
> > > > +static int __init kcompressd_init(void)
> > > > +{
> > > > +       int ret;
> > > > +
> > > > +       nr_kcompressd = DEFAULT_NR_KCOMPRESSD;
> > > > +       queue_size_per_kcompressd = INIT_QUEUE_SIZE;
> > > > +
> > > > +       ret = kcompress_update();
> > > > +       if (ret) {
> > > > +               pr_err("Init kcompressd failed!\n");
> > > > +               return ret;
> > > > +       }
> > > > +
> > > > +       atomic_set(&enable_kcompressd, true);
> > > > +       blocking_notifier_call_chain(&kcompressd_notifier_list,
> > > > 0,
> > > > NULL);
> > > > +       return 0;
> > > > +}
> > > > +
> > > > +static void __exit kcompressd_exit(void)
> > > > +{
> > > > +       atomic_set(&enable_kcompressd, false);
> > > > +       stop_all_kcompressd_thread();
> > > > +
> > > > +       kvfree(kcompress);
> > > > +       kvfree(kcompressd_para);
> > > > +}
> > > > +
> > > > +module_init(kcompressd_init);
> > > > +module_exit(kcompressd_exit);
> > > > +
> > > > +MODULE_LICENSE("Dual BSD/GPL");
> > > > +MODULE_AUTHOR("Qun-Wei Lin <qun-wei.lin@mediatek.com>");
> > > > +MODULE_DESCRIPTION("Separate the page compression from the
> > > > memory
> > > > reclaiming");
> > > > +
> > > > diff --git a/drivers/block/zram/kcompressd.h
> > > > b/drivers/block/zram/kcompressd.h
> > > > new file mode 100644
> > > > index 000000000000..2fe0b424a7af
> > > > --- /dev/null
> > > > +++ b/drivers/block/zram/kcompressd.h
> > > > @@ -0,0 +1,25 @@
> > > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > > +/*
> > > > + * Copyright (C) 2024 MediaTek Inc.
> > > > + */
> > > > +
> > > > +#ifndef _KCOMPRESSD_H_
> > > > +#define _KCOMPRESSD_H_
> > > > +
> > > > +#include <linux/rwsem.h>
> > > > +#include <linux/kfifo.h>
> > > > +#include <linux/atomic.h>
> > > > +
> > > > +typedef void (*compress_callback)(void *mem, struct bio *bio);
> > > > +
> > > > +struct kcompress {
> > > > +       struct task_struct *kcompressd;
> > > > +       wait_queue_head_t kcompressd_wait;
> > > > +       struct kfifo write_fifo;
> > > > +       atomic_t running;
> > > > +};
> > > > +
> > > > +int kcompressd_enabled(void);
> > > > +int schedule_bio_write(void *mem, struct bio *bio,
> > > > compress_callback cb);
> > > > +#endif
> > > > +
> > > > diff --git a/drivers/block/zram/zram_drv.c
> > > > b/drivers/block/zram/zram_drv.c
> > > > index 2e1a70f2f4bd..bcd63ecb6ff2 100644
> > > > --- a/drivers/block/zram/zram_drv.c
> > > > +++ b/drivers/block/zram/zram_drv.c
> > > > @@ -35,6 +35,7 @@
> > > >  #include <linux/part_stat.h>
> > > >  #include <linux/kernel_read_file.h>
> > > > 
> > > > +#include "kcompressd.h"
> > > >  #include "zram_drv.h"
> > > > 
> > > >  static DEFINE_IDR(zram_index_idr);
> > > > @@ -2240,6 +2241,15 @@ static void zram_bio_write(struct zram
> > > > *zram, struct bio *bio)
> > > >         bio_endio(bio);
> > > >  }
> > > > 
> > > > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > > > +static void zram_bio_write_callback(void *mem, struct bio
> > > > *bio)
> > > > +{
> > > > +       struct zram *zram = (struct zram *)mem;
> > > > +
> > > > +       zram_bio_write(zram, bio);
> > > > +}
> > > > +#endif
> > > > +
> > > >  /*
> > > >   * Handler function for all zram I/O requests.
> > > >   */
> > > > @@ -2252,6 +2262,10 @@ static void zram_submit_bio(struct bio
> > > > *bio)
> > > >                 zram_bio_read(zram, bio);
> > > >                 break;
> > > >         case REQ_OP_WRITE:
> > > > +#if IS_ENABLED(CONFIG_KCOMPRESSD)
> > > > +               if (kcompressd_enabled() &&
> > > > !schedule_bio_write(zram, bio, zram_bio_write_callback))
> > > > +                       break;
> > > > +#endif
> > > >                 zram_bio_write(zram, bio);
> > > >                 break;
> > > >         case REQ_OP_DISCARD:
> > > > @@ -2535,9 +2549,11 @@ static int zram_add(void)
> > > >  #if ZRAM_LOGICAL_BLOCK_SIZE == PAGE_SIZE
> > > >                 .max_write_zeroes_sectors       = UINT_MAX,
> > > >  #endif
> > > > -               .features                       =
> > > > BLK_FEAT_STABLE_WRITES        |
> > > > -
> > > > BLK_FEAT_READ_SYNCHRONOUS     |
> > > > -
> > > > BLK_FEAT_WRITE_SYNCHRONOUS,
> > > > +               .features                       =
> > > > BLK_FEAT_STABLE_WRITES
> > > > +                                                 |
> > > > BLK_FEAT_READ_SYNCHRONOUS
> > > > +#if !IS_ENABLED(CONFIG_KCOMPRESSD)
> > > > +                                                 |
> > > > BLK_FEAT_WRITE_SYNCHRONOUS,
> > > > +#endif
> > > >         };
> > > >         struct zram *zram;
> > > >         int ret, device_id;
> > > > --
> > > > 2.45.2
> > > > 
> > > 
> 
> Thanks
> Barry

Best Regards,
Qun-wei
diff mbox series

Patch

diff --git a/drivers/block/zram/Kconfig b/drivers/block/zram/Kconfig
index 402b7b175863..f0a1b574f770 100644
--- a/drivers/block/zram/Kconfig
+++ b/drivers/block/zram/Kconfig
@@ -145,3 +145,14 @@  config ZRAM_MULTI_COMP
 	  re-compress pages using a potentially slower but more effective
 	  compression algorithm. Note, that IDLE page recompression
 	  requires ZRAM_TRACK_ENTRY_ACTIME.
+
+config KCOMPRESSD
+	tristate "Kcompressd: Accelerated zram compression"
+	depends on ZRAM
+	help
+	  Kcompressd creates multiple daemons to accelerate the compression of pages
+	  in zram, offloading this time-consuming task from the zram driver.
+
+	  This approach improves system efficiency by handling page compression separately,
+	  which was originally done by kswapd or direct reclaim.
+
diff --git a/drivers/block/zram/Makefile b/drivers/block/zram/Makefile
index 0fdefd576691..23baa5dfceb9 100644
--- a/drivers/block/zram/Makefile
+++ b/drivers/block/zram/Makefile
@@ -9,4 +9,5 @@  zram-$(CONFIG_ZRAM_BACKEND_ZSTD)	+= backend_zstd.o
 zram-$(CONFIG_ZRAM_BACKEND_DEFLATE)	+= backend_deflate.o
 zram-$(CONFIG_ZRAM_BACKEND_842)		+= backend_842.o
 
-obj-$(CONFIG_ZRAM)	+=	zram.o
+obj-$(CONFIG_ZRAM)		+= zram.o
+obj-$(CONFIG_KCOMPRESSD)	+= kcompressd.o
diff --git a/drivers/block/zram/kcompressd.c b/drivers/block/zram/kcompressd.c
new file mode 100644
index 000000000000..195b7e386869
--- /dev/null
+++ b/drivers/block/zram/kcompressd.c
@@ -0,0 +1,340 @@ 
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (C) 2024 MediaTek Inc.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/bio.h>
+#include <linux/bitops.h>
+#include <linux/freezer.h>
+#include <linux/kernel.h>
+#include <linux/psi.h>
+#include <linux/kfifo.h>
+#include <linux/swap.h>
+#include <linux/delay.h>
+
+#include "kcompressd.h"
+
+#define INIT_QUEUE_SIZE		4096
+#define DEFAULT_NR_KCOMPRESSD	4
+
+static atomic_t enable_kcompressd;
+static unsigned int nr_kcompressd;
+static unsigned int queue_size_per_kcompressd;
+static struct kcompress *kcompress;
+
+enum run_state {
+	KCOMPRESSD_NOT_STARTED = 0,
+	KCOMPRESSD_RUNNING,
+	KCOMPRESSD_SLEEPING,
+};
+
+struct kcompressd_para {
+	wait_queue_head_t *kcompressd_wait;
+	struct kfifo *write_fifo;
+	atomic_t *running;
+};
+
+static struct kcompressd_para *kcompressd_para;
+static BLOCKING_NOTIFIER_HEAD(kcompressd_notifier_list);
+
+struct write_work {
+	void *mem;
+	struct bio *bio;
+	compress_callback cb;
+};
+
+int kcompressd_enabled(void)
+{
+	return likely(atomic_read(&enable_kcompressd));
+}
+EXPORT_SYMBOL(kcompressd_enabled);
+
+static void kcompressd_try_to_sleep(struct kcompressd_para *p)
+{
+	DEFINE_WAIT(wait);
+
+	if (!kfifo_is_empty(p->write_fifo))
+		return;
+
+	if (freezing(current) || kthread_should_stop())
+		return;
+
+	atomic_set(p->running, KCOMPRESSD_SLEEPING);
+	prepare_to_wait(p->kcompressd_wait, &wait, TASK_INTERRUPTIBLE);
+
+	/*
+	 * After a short sleep, check if it was a premature sleep. If not, then
+	 * go fully to sleep until explicitly woken up.
+	 */
+	if (!kthread_should_stop() && kfifo_is_empty(p->write_fifo))
+		schedule();
+
+	finish_wait(p->kcompressd_wait, &wait);
+	atomic_set(p->running, KCOMPRESSD_RUNNING);
+}
+
+static int kcompressd(void *para)
+{
+	struct task_struct *tsk = current;
+	struct kcompressd_para *p = (struct kcompressd_para *)para;
+
+	tsk->flags |= PF_MEMALLOC | PF_KSWAPD;
+	set_freezable();
+
+	while (!kthread_should_stop()) {
+		bool ret;
+
+		kcompressd_try_to_sleep(p);
+		ret = try_to_freeze();
+		if (kthread_should_stop())
+			break;
+
+		if (ret)
+			continue;
+
+		while (!kfifo_is_empty(p->write_fifo)) {
+			struct write_work entry;
+
+			if (sizeof(struct write_work) == kfifo_out(p->write_fifo,
+						&entry, sizeof(struct write_work))) {
+				entry.cb(entry.mem, entry.bio);
+				bio_put(entry.bio);
+			}
+		}
+
+	}
+
+	tsk->flags &= ~(PF_MEMALLOC | PF_KSWAPD);
+	atomic_set(p->running, KCOMPRESSD_NOT_STARTED);
+	return 0;
+}
+
+static int init_write_queue(void)
+{
+	int i;
+	unsigned int queue_len = queue_size_per_kcompressd * sizeof(struct write_work);
+
+	for (i = 0; i < nr_kcompressd; i++) {
+		if (kfifo_alloc(&kcompress[i].write_fifo,
+					queue_len, GFP_KERNEL)) {
+			pr_err("Failed to alloc kfifo %d\n", i);
+			return -ENOMEM;
+		}
+	}
+	return 0;
+}
+
+static void clean_bio_queue(int idx)
+{
+	struct write_work entry;
+
+	while (sizeof(struct write_work) == kfifo_out(&kcompress[idx].write_fifo,
+				&entry, sizeof(struct write_work))) {
+		bio_put(entry.bio);
+		entry.cb(entry.mem, entry.bio);
+	}
+	kfifo_free(&kcompress[idx].write_fifo);
+}
+
+static int kcompress_update(void)
+{
+	int i;
+	int ret;
+
+	kcompress = kvmalloc_array(nr_kcompressd, sizeof(struct kcompress), GFP_KERNEL);
+	if (!kcompress)
+		return -ENOMEM;
+
+	kcompressd_para = kvmalloc_array(nr_kcompressd, sizeof(struct kcompressd_para), GFP_KERNEL);
+	if (!kcompressd_para)
+		return -ENOMEM;
+
+	ret = init_write_queue();
+	if (ret) {
+		pr_err("Initialization of writing to FIFOs failed!!\n");
+		return ret;
+	}
+
+	for (i = 0; i < nr_kcompressd; i++) {
+		init_waitqueue_head(&kcompress[i].kcompressd_wait);
+		kcompressd_para[i].kcompressd_wait = &kcompress[i].kcompressd_wait;
+		kcompressd_para[i].write_fifo = &kcompress[i].write_fifo;
+		kcompressd_para[i].running = &kcompress[i].running;
+	}
+
+	return 0;
+}
+
+static void stop_all_kcompressd_thread(void)
+{
+	int i;
+
+	for (i = 0; i < nr_kcompressd; i++) {
+		kthread_stop(kcompress[i].kcompressd);
+		kcompress[i].kcompressd = NULL;
+		clean_bio_queue(i);
+	}
+}
+
+static int do_nr_kcompressd_handler(const char *val,
+		const struct kernel_param *kp)
+{
+	int ret;
+
+	atomic_set(&enable_kcompressd, false);
+
+	stop_all_kcompressd_thread();
+
+	ret = param_set_int(val, kp);
+	if (!ret) {
+		pr_err("Invalid number of kcompressd.\n");
+		return -EINVAL;
+	}
+
+	ret = init_write_queue();
+	if (ret) {
+		pr_err("Initialization of writing to FIFOs failed!!\n");
+		return ret;
+	}
+
+	atomic_set(&enable_kcompressd, true);
+
+	return 0;
+}
+
+static const struct kernel_param_ops param_ops_change_nr_kcompressd = {
+	.set = &do_nr_kcompressd_handler,
+	.get = &param_get_uint,
+	.free = NULL,
+};
+
+module_param_cb(nr_kcompressd, &param_ops_change_nr_kcompressd,
+		&nr_kcompressd, 0644);
+MODULE_PARM_DESC(nr_kcompressd, "Number of pre-created daemon for page compression");
+
+static int do_queue_size_per_kcompressd_handler(const char *val,
+		const struct kernel_param *kp)
+{
+	int ret;
+
+	atomic_set(&enable_kcompressd, false);
+
+	stop_all_kcompressd_thread();
+
+	ret = param_set_int(val, kp);
+	if (!ret) {
+		pr_err("Invalid queue size for kcompressd.\n");
+		return -EINVAL;
+	}
+
+	ret = init_write_queue();
+	if (ret) {
+		pr_err("Initialization of writing to FIFOs failed!!\n");
+		return ret;
+	}
+
+	pr_info("Queue size for kcompressd was changed: %d\n", queue_size_per_kcompressd);
+
+	atomic_set(&enable_kcompressd, true);
+	return 0;
+}
+
+static const struct kernel_param_ops param_ops_change_queue_size_per_kcompressd = {
+	.set = &do_queue_size_per_kcompressd_handler,
+	.get = &param_get_uint,
+	.free = NULL,
+};
+
+module_param_cb(queue_size_per_kcompressd, &param_ops_change_queue_size_per_kcompressd,
+		&queue_size_per_kcompressd, 0644);
+MODULE_PARM_DESC(queue_size_per_kcompressd,
+		"Size of queue for kcompressd");
+
+int schedule_bio_write(void *mem, struct bio *bio, compress_callback cb)
+{
+	int i;
+	bool submit_success = false;
+	size_t sz_work = sizeof(struct write_work);
+
+	struct write_work entry = {
+		.mem = mem,
+		.bio = bio,
+		.cb = cb
+	};
+
+	if (unlikely(!atomic_read(&enable_kcompressd)))
+		return -EBUSY;
+
+	if (!nr_kcompressd || !current_is_kswapd())
+		return -EBUSY;
+
+	bio_get(bio);
+
+	for (i = 0; i < nr_kcompressd; i++) {
+		submit_success =
+			(kfifo_avail(&kcompress[i].write_fifo) >= sz_work) &&
+			(sz_work == kfifo_in(&kcompress[i].write_fifo, &entry, sz_work));
+
+		if (submit_success) {
+			switch (atomic_read(&kcompress[i].running)) {
+			case KCOMPRESSD_NOT_STARTED:
+				atomic_set(&kcompress[i].running, KCOMPRESSD_RUNNING);
+				kcompress[i].kcompressd = kthread_run(kcompressd,
+						&kcompressd_para[i], "kcompressd:%d", i);
+				if (IS_ERR(kcompress[i].kcompressd)) {
+					atomic_set(&kcompress[i].running, KCOMPRESSD_NOT_STARTED);
+					pr_warn("Failed to start kcompressd:%d\n", i);
+					clean_bio_queue(i);
+				}
+				break;
+			case KCOMPRESSD_RUNNING:
+				break;
+			case KCOMPRESSD_SLEEPING:
+				wake_up_interruptible(&kcompress[i].kcompressd_wait);
+				break;
+			}
+			return 0;
+		}
+	}
+
+	bio_put(bio);
+	return -EBUSY;
+}
+EXPORT_SYMBOL(schedule_bio_write);
+
+static int __init kcompressd_init(void)
+{
+	int ret;
+
+	nr_kcompressd = DEFAULT_NR_KCOMPRESSD;
+	queue_size_per_kcompressd = INIT_QUEUE_SIZE;
+
+	ret = kcompress_update();
+	if (ret) {
+		pr_err("Init kcompressd failed!\n");
+		return ret;
+	}
+
+	atomic_set(&enable_kcompressd, true);
+	blocking_notifier_call_chain(&kcompressd_notifier_list, 0, NULL);
+	return 0;
+}
+
+static void __exit kcompressd_exit(void)
+{
+	atomic_set(&enable_kcompressd, false);
+	stop_all_kcompressd_thread();
+
+	kvfree(kcompress);
+	kvfree(kcompressd_para);
+}
+
+module_init(kcompressd_init);
+module_exit(kcompressd_exit);
+
+MODULE_LICENSE("Dual BSD/GPL");
+MODULE_AUTHOR("Qun-Wei Lin <qun-wei.lin@mediatek.com>");
+MODULE_DESCRIPTION("Separate the page compression from the memory reclaiming");
+
diff --git a/drivers/block/zram/kcompressd.h b/drivers/block/zram/kcompressd.h
new file mode 100644
index 000000000000..2fe0b424a7af
--- /dev/null
+++ b/drivers/block/zram/kcompressd.h
@@ -0,0 +1,25 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Copyright (C) 2024 MediaTek Inc.
+ */
+
+#ifndef _KCOMPRESSD_H_
+#define _KCOMPRESSD_H_
+
+#include <linux/rwsem.h>
+#include <linux/kfifo.h>
+#include <linux/atomic.h>
+
+typedef void (*compress_callback)(void *mem, struct bio *bio);
+
+struct kcompress {
+	struct task_struct *kcompressd;
+	wait_queue_head_t kcompressd_wait;
+	struct kfifo write_fifo;
+	atomic_t running;
+};
+
+int kcompressd_enabled(void);
+int schedule_bio_write(void *mem, struct bio *bio, compress_callback cb);
+#endif
+
diff --git a/drivers/block/zram/zram_drv.c b/drivers/block/zram/zram_drv.c
index 2e1a70f2f4bd..bcd63ecb6ff2 100644
--- a/drivers/block/zram/zram_drv.c
+++ b/drivers/block/zram/zram_drv.c
@@ -35,6 +35,7 @@ 
 #include <linux/part_stat.h>
 #include <linux/kernel_read_file.h>
 
+#include "kcompressd.h"
 #include "zram_drv.h"
 
 static DEFINE_IDR(zram_index_idr);
@@ -2240,6 +2241,15 @@  static void zram_bio_write(struct zram *zram, struct bio *bio)
 	bio_endio(bio);
 }
 
+#if IS_ENABLED(CONFIG_KCOMPRESSD)
+static void zram_bio_write_callback(void *mem, struct bio *bio)
+{
+	struct zram *zram = (struct zram *)mem;
+
+	zram_bio_write(zram, bio);
+}
+#endif
+
 /*
  * Handler function for all zram I/O requests.
  */
@@ -2252,6 +2262,10 @@  static void zram_submit_bio(struct bio *bio)
 		zram_bio_read(zram, bio);
 		break;
 	case REQ_OP_WRITE:
+#if IS_ENABLED(CONFIG_KCOMPRESSD)
+		if (kcompressd_enabled() && !schedule_bio_write(zram, bio, zram_bio_write_callback))
+			break;
+#endif
 		zram_bio_write(zram, bio);
 		break;
 	case REQ_OP_DISCARD:
@@ -2535,9 +2549,11 @@  static int zram_add(void)
 #if ZRAM_LOGICAL_BLOCK_SIZE == PAGE_SIZE
 		.max_write_zeroes_sectors	= UINT_MAX,
 #endif
-		.features			= BLK_FEAT_STABLE_WRITES	|
-						  BLK_FEAT_READ_SYNCHRONOUS	|
-						  BLK_FEAT_WRITE_SYNCHRONOUS,
+		.features			= BLK_FEAT_STABLE_WRITES
+						  | BLK_FEAT_READ_SYNCHRONOUS
+#if !IS_ENABLED(CONFIG_KCOMPRESSD)
+						  | BLK_FEAT_WRITE_SYNCHRONOUS,
+#endif
 	};
 	struct zram *zram;
 	int ret, device_id;