diff mbox series

softirq: fix memory corruption when freeing tasklet_struct

Message ID 82b964f0-c2c8-a2c6-5b1f-f3145dc2c8e5@redhat.com (mailing list archive)
State Rejected, archived
Delegated to: Mike Snitzer
Headers show
Series softirq: fix memory corruption when freeing tasklet_struct | expand

Commit Message

Mikulas Patocka Jan. 25, 2024, 6:29 p.m. UTC
Hi

There's a problem with the tasklet API - there is no reliable way how to
free a structure that contains tasklet_struct. The problem is that the
function tasklet_action_common calls task_unlock(t) after it called the
callback. If the callback does something that frees tasklet_struct,
task_unlock(t) would write into free memory.

dm-crypt does this - it schedules a tasklet with tasklet_schedule, it does
encryption inside the tasklet handler (because it performs better than
doing the encryption in a workqueue), then it submits a workqueue entry
and calls bio_endio from the workqueue entry.

However, if the workqueue preempts ksoftirqd, this race condition happens:

ksoftirqd:
* tasklet_action_common
* t->func(t->data)	(that points to kcryptd_crypt_tasklet)
* kcryptd_crypt_tasklet
* kcryptd_crypt
* kcryptd_crypt_read_convert
* crypt_dec_pending
* queue_work(cc->io_queue, &io->work);
now we switch to the workqueue process:
* kcryptd_io_bio_endio
* bio_endio(io->base_bio)	(this calls clone_endio)
* clone_endio
* free_tio
* bio_put(clone) - the bio is freed
now we switch back to ksoftirqd:
* tasklet_action_common calls task_unlock(t) 
* task_unlock(t) touches memory that was already freed when the bio was freed

dm-verity has a similar problem.

In order to fix this bug, I am proposing to add a new flag
TASKLET_STATE_ONESHOT. The flag indicates that the tasklet will be
submitted only once and it prevents tasklet_action_common from touching
the tasklet after the callback completed.

If you have another idea how to solve this bug, let me know.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Fixes: 39d42fa96ba1 ("dm crypt: add flags to optionally bypass kcryptd workqueues")
Fixes: 5721d4e5a9cd ("dm verity: Add optional "try_verify_in_tasklet" feature")
Cc: stable@vger.kernel.org	# v5.9+

---
 drivers/md/dm-crypt.c         |    1 +
 drivers/md/dm-verity-target.c |    1 +
 include/linux/interrupt.h     |    9 ++++++++-
 kernel/softirq.c              |   22 +++++++++++++++-------
 4 files changed, 25 insertions(+), 8 deletions(-)

Comments

Linus Torvalds Jan. 25, 2024, 7:51 p.m. UTC | #1
On Thu, 25 Jan 2024 at 10:30, Mikulas Patocka <mpatocka@redhat.com> wrote:
>
> There's a problem with the tasklet API - there is no reliable way how to
> free a structure that contains tasklet_struct. The problem is that the
> function tasklet_action_common calls task_unlock(t) after it called the
> callback. If the callback does something that frees tasklet_struct,
> task_unlock(t) would write into free memory.

Ugh.

I see what you're doing, but I have to say, I dislike this patch
immensely. It feels like a serious misdesign that is then papered over
with a hack.

I'd much rather see us trying to move away from tasklets entirely in
cases like this. Just say "you cannot do that".

In fact, of the two cases that want this new functionality, at least
dm-verity already makes tasklets a conditional feature that isn't even
enabled by default, and that was only introduced in the last couple of
years.

So I think dm-verity would be better off just removing tasklet use,
and we should check whether there are better models for handling the
latency issue.

The dm-crypt.c case looks different, but similar. I'm not sure why it
doesn't just use the workqueue for the "in interrupt" case. Like
dm-verity, it already does have a workqueue option, and it's a
setup-time option to say "don't use the workqueue for reads / writes".
But it feels like the code should just say "tough luck, in interrupt
context we *will* use workqueues".

So honestly, both of the cases you bring up seem to be just BUGGY. The
fix is not to extend tasklets to a new thing, the fix is to say "those
two uses of tasklets were broken, and should go away".

End result: I would suggest:

 - just get rid of the actively buggy use of tasklets. It's not
necessary in either case.

 - look at introducing a "low-latency atomic workqueue" that looks
*exactly* like a regular workqueue, but has the rule that it's per-cpu
and functions on it cannot sleep

because I think one common issue with workqueues - which are better
designed than tasklets - is that scheduling latency.

I think if we introduced a workqueue that worked more like a tasklet -
in that it's run in softirq context - but doesn't have the interface
mistakes of tasklets, a number of existing workqueue users might
decide that that is exactly what they want.

So we could have a per-cpu 'atomic_wq' that things can be scheduled
on, and that runs from softirqs just like tasklets, and shares the
workqueue queueing infrastructure but doesn't use the workqueue
threads.

Yes, the traditional use of workqueues is to be able to sleep and do
things in process context, so that sounds a bit odd, but let's face
it, we

 (a) already have multiple classes of workqueues

 (b) avoiding deep - and possibly recursive - stack depths is another
reason people use workqueues

 (c) avoiding interrupt context is a real concern, even if you don't
want to sleep

and I really *really* would like to get rid of tasklets entirely.

They started as this very specific hardcoded softirq thing used by
some drivers, and then the notion was generalized.

And I think it was generalized badly, as shown by this example.

I have added Tejun to the cc, so that he can throw his hands up in
horror and say "Linus, you're crazy, your drug-fueled idea would be
horrid because of Xyz".

But *maybe* Tejun has been taking the same drugs I have, and goes
"yeah, that would fit well".

Tejun? Please tell me I'm not on some bad crack..

               Linus
Mikulas Patocka Jan. 25, 2024, 10:04 p.m. UTC | #2
On Thu, 25 Jan 2024, Linus Torvalds wrote:

> On Thu, 25 Jan 2024 at 10:30, Mikulas Patocka <mpatocka@redhat.com> wrote:
> >
> > There's a problem with the tasklet API - there is no reliable way how to
> > free a structure that contains tasklet_struct. The problem is that the
> > function tasklet_action_common calls task_unlock(t) after it called the
> > callback. If the callback does something that frees tasklet_struct,
> > task_unlock(t) would write into free memory.
> 
> Ugh.
> 
> I see what you're doing, but I have to say, I dislike this patch
> immensely. It feels like a serious misdesign that is then papered over
> with a hack.
> 
> I'd much rather see us trying to move away from tasklets entirely in
> cases like this. Just say "you cannot do that".

OK. I will delete tasklets from both dm-crypt and dm-verity - it will 
simplify them quite a bit.

BTW. Do you think that we should get rid of request-based device mapper as 
well? (that's another thing that looks like code bloat to me)

Mikulas
Ignat Korchagin Jan. 25, 2024, 10:51 p.m. UTC | #3
On Thu, Jan 25, 2024 at 7:51 PM Linus Torvalds
<torvalds@linux-foundation.org> wrote:
>
> On Thu, 25 Jan 2024 at 10:30, Mikulas Patocka <mpatocka@redhat.com> wrote:
> >
> > There's a problem with the tasklet API - there is no reliable way how to
> > free a structure that contains tasklet_struct. The problem is that the
> > function tasklet_action_common calls task_unlock(t) after it called the
> > callback. If the callback does something that frees tasklet_struct,
> > task_unlock(t) would write into free memory.
>
> Ugh.
>
> I see what you're doing, but I have to say, I dislike this patch
> immensely. It feels like a serious misdesign that is then papered over
> with a hack.
>
> I'd much rather see us trying to move away from tasklets entirely in
> cases like this. Just say "you cannot do that".
>
> In fact, of the two cases that want this new functionality, at least
> dm-verity already makes tasklets a conditional feature that isn't even
> enabled by default, and that was only introduced in the last couple of
> years.
>
> So I think dm-verity would be better off just removing tasklet use,
> and we should check whether there are better models for handling the
> latency issue.
>
> The dm-crypt.c case looks different, but similar. I'm not sure why it
> doesn't just use the workqueue for the "in interrupt" case. Like
> dm-verity, it already does have a workqueue option, and it's a
> setup-time option to say "don't use the workqueue for reads / writes".
> But it feels like the code should just say "tough luck, in interrupt
> context we *will* use workqueues".

This is not great just considering the following context: the
tasklet/interrupt code was added, when we discovered dm-crypt most
likely processes read bios in interrupt context when using NVME
drives. So this would penalise the most desirable case for fast disk
encryption (decryption that is): systems use NVMEs mostly to get fast
IO, so they are the most sensitive to any latency introduced by
dm-crypt. For us it was a go/no-go: without this we might have
switched to dodgy proprietary self encrypting drives.

On the other hand this code never uses tasklets for slower media, but
they don't care and can use the default workqueue option in the first
place.

> So honestly, both of the cases you bring up seem to be just BUGGY. The
> fix is not to extend tasklets to a new thing, the fix is to say "those
> two uses of tasklets were broken, and should go away".
>
> End result: I would suggest:
>
>  - just get rid of the actively buggy use of tasklets. It's not
> necessary in either case.
>
>  - look at introducing a "low-latency atomic workqueue" that looks
> *exactly* like a regular workqueue, but has the rule that it's per-cpu
> and functions on it cannot sleep
>
> because I think one common issue with workqueues - which are better
> designed than tasklets - is that scheduling latency.
>
> I think if we introduced a workqueue that worked more like a tasklet -
> in that it's run in softirq context - but doesn't have the interface
> mistakes of tasklets, a number of existing workqueue users might
> decide that that is exactly what they want.
>
> So we could have a per-cpu 'atomic_wq' that things can be scheduled
> on, and that runs from softirqs just like tasklets, and shares the
> workqueue queueing infrastructure but doesn't use the workqueue
> threads.
>
> Yes, the traditional use of workqueues is to be able to sleep and do
> things in process context, so that sounds a bit odd, but let's face
> it, we
>
>  (a) already have multiple classes of workqueues
>
>  (b) avoiding deep - and possibly recursive - stack depths is another
> reason people use workqueues
>
>  (c) avoiding interrupt context is a real concern, even if you don't
> want to sleep
>
> and I really *really* would like to get rid of tasklets entirely.
>
> They started as this very specific hardcoded softirq thing used by
> some drivers, and then the notion was generalized.
>
> And I think it was generalized badly, as shown by this example.
>
> I have added Tejun to the cc, so that he can throw his hands up in
> horror and say "Linus, you're crazy, your drug-fueled idea would be
> horrid because of Xyz".
>
> But *maybe* Tejun has been taking the same drugs I have, and goes
> "yeah, that would fit well".
>
> Tejun? Please tell me I'm not on some bad crack..
>
>                Linus
Damien Le Moal Jan. 25, 2024, 11:15 p.m. UTC | #4
On 1/26/24 07:04, Mikulas Patocka wrote:
> 
> 
> On Thu, 25 Jan 2024, Linus Torvalds wrote:
> 
>> On Thu, 25 Jan 2024 at 10:30, Mikulas Patocka <mpatocka@redhat.com> wrote:
>>>
>>> There's a problem with the tasklet API - there is no reliable way how to
>>> free a structure that contains tasklet_struct. The problem is that the
>>> function tasklet_action_common calls task_unlock(t) after it called the
>>> callback. If the callback does something that frees tasklet_struct,
>>> task_unlock(t) would write into free memory.
>>
>> Ugh.
>>
>> I see what you're doing, but I have to say, I dislike this patch
>> immensely. It feels like a serious misdesign that is then papered over
>> with a hack.
>>
>> I'd much rather see us trying to move away from tasklets entirely in
>> cases like this. Just say "you cannot do that".
> 
> OK. I will delete tasklets from both dm-crypt and dm-verity - it will 
> simplify them quite a bit.
> 
> BTW. Do you think that we should get rid of request-based device mapper as 
> well? (that's another thing that looks like code bloat to me)

That would force removing dm-multipath, which is I think the only DM driver
using requests. But given how widespread the use of dm-multipath is, killing it
would likely make a lot of people unhappy...
Tejun Heo Jan. 26, 2024, 12:33 a.m. UTC | #5
Hello, Linus.

On Thu, Jan 25, 2024 at 11:51:28AM -0800, Linus Torvalds wrote:
...
> So we could have a per-cpu 'atomic_wq' that things can be scheduled
> on, and that runs from softirqs just like tasklets, and shares the
> workqueue queueing infrastructure but doesn't use the workqueue
> threads.
> 
> Yes, the traditional use of workqueues is to be able to sleep and do
> things in process context, so that sounds a bit odd, but let's face
> it, we
> 
>  (a) already have multiple classes of workqueues
> 
>  (b) avoiding deep - and possibly recursive - stack depths is another
> reason people use workqueues
> 
>  (c) avoiding interrupt context is a real concern, even if you don't
> want to sleep
> 
> and I really *really* would like to get rid of tasklets entirely.
> 
> They started as this very specific hardcoded softirq thing used by
> some drivers, and then the notion was generalized.
> 
> And I think it was generalized badly, as shown by this example.
> 
> I have added Tejun to the cc, so that he can throw his hands up in
> horror and say "Linus, you're crazy, your drug-fueled idea would be
> horrid because of Xyz".
> 
> But *maybe* Tejun has been taking the same drugs I have, and goes
> "yeah, that would fit well".
> 
> Tejun? Please tell me I'm not on some bad crack..

That doesn't sound too crazy to me. I need to think more about how flush /
cancel paths would look but hopefully it won't be too bad. I'll hack up
something and see how things look.

Thanks.
Hannes Reinecke Jan. 26, 2024, 6:56 a.m. UTC | #6
On 1/26/24 00:15, Damien Le Moal wrote:
> On 1/26/24 07:04, Mikulas Patocka wrote:
>>
>>
>> On Thu, 25 Jan 2024, Linus Torvalds wrote:
>>
>>> On Thu, 25 Jan 2024 at 10:30, Mikulas Patocka <mpatocka@redhat.com> wrote:
>>>>
>>>> There's a problem with the tasklet API - there is no reliable way how to
>>>> free a structure that contains tasklet_struct. The problem is that the
>>>> function tasklet_action_common calls task_unlock(t) after it called the
>>>> callback. If the callback does something that frees tasklet_struct,
>>>> task_unlock(t) would write into free memory.
>>>
>>> Ugh.
>>>
>>> I see what you're doing, but I have to say, I dislike this patch
>>> immensely. It feels like a serious misdesign that is then papered over
>>> with a hack.
>>>
>>> I'd much rather see us trying to move away from tasklets entirely in
>>> cases like this. Just say "you cannot do that".
>>
>> OK. I will delete tasklets from both dm-crypt and dm-verity - it will
>> simplify them quite a bit.
>>
>> BTW. Do you think that we should get rid of request-based device mapper as
>> well? (that's another thing that looks like code bloat to me)
> 
> That would force removing dm-multipath, which is I think the only DM driver
> using requests. But given how widespread the use of dm-multipath is, killing it
> would likely make a lot of people unhappy...
> 
Oh, it's this time of the year again?
(This topic regularly comes up ...)

The reason is not that it will disable dm-multipath (Mike Snitzer put in 
bio-based multipathing as an additional code path); the reason is that 
dm-multipath performance will suffer when you remove request-based DM.

DM-multipath schedules based on request (if you use the request-based 
interface) or bios (if you use the bio-based interface).
Any merge decision is typically done by the block layer when combining 
bios into requests; and you can only merge bios if the bvecs are adjacent.
So if you use bio-based multipathing you will spread sequential bios
across all paths, leaving the block layer unable to merge requests.
For request based multipathing the requests are already fully-formed,
and scheduling across paths does not change them.
Things are slightly better with multi-page bvecs nowadays, but the
overall picture still stands.

Another thing is timeouts; bios don't do timeouts, so a bio can run
for an arbitrary time with no chance of interrupting it.
Requests do have a timeout, and will be aborted from the driver when
the timeout is hit.
Seeing that 99% of all I/O issues I've seen _are_ timeouts it becomes
a crucial feature if you want dm-multipath to control failover time.

Cheers,

Hannes
Mikulas Patocka Jan. 26, 2024, 6:09 p.m. UTC | #7
On Fri, 26 Jan 2024, Hannes Reinecke wrote:

> Oh, it's this time of the year again?
> (This topic regularly comes up ...)
> 
> The reason is not that it will disable dm-multipath (Mike Snitzer put in
> bio-based multipathing as an additional code path); the reason is that
> dm-multipath performance will suffer when you remove request-based DM.

Is there some benchmark that says how much will it suffer?

> DM-multipath schedules based on request (if you use the request-based
> interface) or bios (if you use the bio-based interface).
> Any merge decision is typically done by the block layer when combining bios
> into requests; and you can only merge bios if the bvecs are adjacent.
> So if you use bio-based multipathing you will spread sequential bios
> across all paths, leaving the block layer unable to merge requests.

The same problem exists in raid1 and there's a function read_balance that 
solves it. If the starting sector of a new bio matches the ending sector 
of a previous bio, then submit it for the same device.

> For request based multipathing the requests are already fully-formed,
> and scheduling across paths does not change them.
> Things are slightly better with multi-page bvecs nowadays, but the
> overall picture still stands.
> 
> Another thing is timeouts; bios don't do timeouts, so a bio can run
> for an arbitrary time with no chance of interrupting it.
> Requests do have a timeout, and will be aborted from the driver when
> the timeout is hit.
> Seeing that 99% of all I/O issues I've seen _are_ timeouts it becomes
> a crucial feature if you want dm-multipath to control failover time.

You can set timeout of the underlying physical devices.

> Cheers,
> 
> Hannes

Mikulas
Allen Jan. 26, 2024, 6:36 p.m. UTC | #8
> > There's a problem with the tasklet API - there is no reliable way how to
> > free a structure that contains tasklet_struct. The problem is that the
> > function tasklet_action_common calls task_unlock(t) after it called the
> > callback. If the callback does something that frees tasklet_struct,
> > task_unlock(t) would write into free memory.
>
> Ugh.
>
> I see what you're doing, but I have to say, I dislike this patch
> immensely. It feels like a serious misdesign that is then papered over
> with a hack.
>
> I'd much rather see us trying to move away from tasklets entirely in
> cases like this. Just say "you cannot do that".
>
 The idea of moving away from using tasklets has been discussed several times.
I am working on entirely moving away from using tasklets. Ofcourse, we have
some subsystems(like DMA), where we need to do a little more.

> In fact, of the two cases that want this new functionality, at least
> dm-verity already makes tasklets a conditional feature that isn't even
> enabled by default, and that was only introduced in the last couple of
> years.
>
> So I think dm-verity would be better off just removing tasklet use,
> and we should check whether there are better models for handling the
> latency issue.
>
> The dm-crypt.c case looks different, but similar. I'm not sure why it
> doesn't just use the workqueue for the "in interrupt" case. Like
> dm-verity, it already does have a workqueue option, and it's a
> setup-time option to say "don't use the workqueue for reads / writes".
> But it feels like the code should just say "tough luck, in interrupt
> context we *will* use workqueues".
>
> So honestly, both of the cases you bring up seem to be just BUGGY. The
> fix is not to extend tasklets to a new thing, the fix is to say "those
> two uses of tasklets were broken, and should go away".
>
> End result: I would suggest:
>
>  - just get rid of the actively buggy use of tasklets. It's not
> necessary in either case.
>
>  - look at introducing a "low-latency atomic workqueue" that looks
> *exactly* like a regular workqueue, but has the rule that it's per-cpu
> and functions on it cannot sleep
>
> because I think one common issue with workqueues - which are better
> designed than tasklets - is that scheduling latency.
>
> I think if we introduced a workqueue that worked more like a tasklet -
> in that it's run in softirq context - but doesn't have the interface
> mistakes of tasklets, a number of existing workqueue users might
> decide that that is exactly what they want.
>
> So we could have a per-cpu 'atomic_wq' that things can be scheduled
> on, and that runs from softirqs just like tasklets, and shares the
> workqueue queueing infrastructure but doesn't use the workqueue
> threads.
>
> Yes, the traditional use of workqueues is to be able to sleep and do
> things in process context, so that sounds a bit odd, but let's face
> it, we
>
>  (a) already have multiple classes of workqueues
>
>  (b) avoiding deep - and possibly recursive - stack depths is another
> reason people use workqueues
>
>  (c) avoiding interrupt context is a real concern, even if you don't
> want to sleep
>
> and I really *really* would like to get rid of tasklets entirely.
>
> They started as this very specific hardcoded softirq thing used by
> some drivers, and then the notion was generalized.
>
> And I think it was generalized badly, as shown by this example.
>
> I have added Tejun to the cc, so that he can throw his hands up in
> horror and say "Linus, you're crazy, your drug-fueled idea would be
> horrid because of Xyz".
>
> But *maybe* Tejun has been taking the same drugs I have, and goes
> "yeah, that would fit well".
>
> Tejun? Please tell me I'm not on some bad crack..
>
>                Linus
>
       - Allen
Tejun Heo Jan. 26, 2024, 11:43 p.m. UTC | #9
Hello,

The following is a draft patch which implements atomic workqueues and
convert dm-crypt to use it instead of tasklet. It's an early draft and very
lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
pending patchset. The following git branch can be used for testing.

  git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft

I'll go over it to make sure all the pieces work. While it adds some
complications, it doesn't seem too bad and conversion from tasklet should be
straightforward too.

- It hooks into tasklet[_hi] for now but if we get to update all of tasklet
  users, we can just repurpose the tasklet softirq slots directly.

- I thought about allowing busy-waits for flushes and cancels but it didn't
  seem necessary. Keeping them blocking has the benefit of avoiding possible
  nasty deadlocks. We can revisit if there's need.

- Compared to tasklet, each work item goes through a bit more management
  code because I wanted to keep the code as unified as possible to regular
  threaded workqueues. That said, it's not a huge amount and my bet is that
  the difference is unlikely to be noticeable.

Thanks.

From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
From: Tejun Heo <tj@kernel.org>
Date: Fri, 26 Jan 2024 13:21:42 -1000
Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
 dmcrypt to use it

---
 drivers/md/dm-crypt.c       |  36 +-----
 include/linux/workqueue.h   |   6 +
 kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
 kernel/workqueue_internal.h |   3 +
 4 files changed, 186 insertions(+), 93 deletions(-)

diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
index 855b482cbff1..d375285db202 100644
--- a/drivers/md/dm-crypt.c
+++ b/drivers/md/dm-crypt.c
@@ -73,11 +73,8 @@ struct dm_crypt_io {
 	struct bio *base_bio;
 	u8 *integrity_metadata;
 	bool integrity_metadata_from_pool:1;
-	bool in_tasklet:1;
 
 	struct work_struct work;
-	struct tasklet_struct tasklet;
-
 	struct convert_context ctx;
 
 	atomic_t io_pending;
@@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
 	io->ctx.r.req = NULL;
 	io->integrity_metadata = NULL;
 	io->integrity_metadata_from_pool = false;
-	io->in_tasklet = false;
 	atomic_set(&io->io_pending, 0);
 }
 
@@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
 	atomic_inc(&io->io_pending);
 }
 
-static void kcryptd_io_bio_endio(struct work_struct *work)
-{
-	struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
-
-	bio_endio(io->base_bio);
-}
-
 /*
  * One of the bios was finished. Check for completion of
  * the whole request and correctly clean up the buffer.
@@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
 		kfree(io->integrity_metadata);
 
 	base_bio->bi_status = error;
-
-	/*
-	 * If we are running this function from our tasklet,
-	 * we can't call bio_endio() here, because it will call
-	 * clone_endio() from dm.c, which in turn will
-	 * free the current struct dm_crypt_io structure with
-	 * our tasklet. In this case we need to delay bio_endio()
-	 * execution to after the tasklet is done and dequeued.
-	 */
-	if (io->in_tasklet) {
-		INIT_WORK(&io->work, kcryptd_io_bio_endio);
-		queue_work(cc->io_queue, &io->work);
-		return;
-	}
-
 	bio_endio(base_bio);
 }
 
@@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
 		kcryptd_crypt_write_convert(io);
 }
 
-static void kcryptd_crypt_tasklet(unsigned long work)
-{
-	kcryptd_crypt((struct work_struct *)work);
-}
-
 static void kcryptd_queue_crypt(struct dm_crypt_io *io)
 {
 	struct crypt_config *cc = io->cc;
@@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
 		 * it is being executed with irqs disabled.
 		 */
 		if (in_hardirq() || irqs_disabled()) {
-			io->in_tasklet = true;
-			tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
-			tasklet_schedule(&io->tasklet);
+			INIT_WORK(&io->work, kcryptd_crypt);
+			queue_work(system_atomic_wq, &io->work);
 			return;
 		}
 
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 232baea90a1d..1e4938b5b176 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
  * Documentation/core-api/workqueue.rst.
  */
 enum wq_flags {
+	WQ_ATOMIC		= 1 << 0, /* execute in softirq context */
 	WQ_UNBOUND		= 1 << 1, /* not bound to any cpu */
 	WQ_FREEZABLE		= 1 << 2, /* freeze during suspend */
 	WQ_MEM_RECLAIM		= 1 << 3, /* may be used for memory reclaim */
@@ -392,6 +393,9 @@ enum wq_flags {
 	__WQ_ORDERED		= 1 << 17, /* internal: workqueue is ordered */
 	__WQ_LEGACY		= 1 << 18, /* internal: create*_workqueue() */
 	__WQ_ORDERED_EXPLICIT	= 1 << 19, /* internal: alloc_ordered_workqueue() */
+
+	/* atomic wq only allows the following flags */
+	__WQ_ATOMIC_ALLOWS	= WQ_ATOMIC | WQ_HIGHPRI,
 };
 
 enum wq_consts {
@@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
 extern struct workqueue_struct *system_freezable_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;
+extern struct workqueue_struct *system_atomic_wq;
+extern struct workqueue_struct *system_atomic_highpri_wq;
 
 /**
  * alloc_workqueue - allocate a workqueue
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 23740c9ed57a..2a8f21494676 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -73,7 +73,8 @@ enum worker_pool_flags {
 	 * wq_pool_attach_mutex to avoid changing binding state while
 	 * worker_attach_to_pool() is in progress.
 	 */
-	POOL_MANAGER_ACTIVE	= 1 << 0,	/* being managed */
+	POOL_ATOMIC		= 1 << 0,	/* is an atomic pool */
+	POOL_MANAGER_ACTIVE	= 1 << 1,	/* being managed */
 	POOL_DISASSOCIATED	= 1 << 2,	/* cpu can't serve workers */
 };
 
@@ -115,6 +116,14 @@ enum wq_internal_consts {
 	WQ_NAME_LEN		= 32,
 };
 
+/*
+ * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
+ * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
+ * msecs_to_jiffies() can't be an initializer.
+ */
+#define ATOMIC_WORKER_JIFFIES	msecs_to_jiffies(2)
+#define ATOMIC_WORKER_RESTARTS	10
+
 /*
  * Structure fields follow one of the following exclusion rules.
  *
@@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
 #endif
 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
 
+/* the atomic worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+				     atomic_worker_pools);
+
 /* the per-cpu worker pools */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+				     cpu_worker_pools);
 
 static DEFINE_IDR(worker_pool_idr);	/* PR: idr of all pools */
 
@@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
+struct workqueue_struct *system_atomic_wq;
+EXPORT_SYMBOL_GPL(system_atomic_wq);
+struct workqueue_struct *system_atomic_highpri_wq;
+EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
 
 static int worker_thread(void *__worker);
+static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
 static void show_pwq(struct pool_workqueue *pwq);
 static void show_one_worker_pool(struct worker_pool *pool);
@@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
 			 !lockdep_is_held(&wq_pool_mutex),		\
 			 "RCU, wq->mutex or wq_pool_mutex should be held")
 
+#define for_each_atomic_worker_pool(pool, cpu)				\
+	for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];		\
+	     (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+	     (pool)++)
+
 #define for_each_cpu_worker_pool(pool, cpu)				\
 	for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];		\
 	     (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
 	if (!need_more_worker(pool) || !worker)
 		return false;
 
+	if (pool->flags & POOL_ATOMIC) {
+		if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
+			tasklet_hi_schedule(&worker->atomic_tasklet);
+		else
+			tasklet_schedule(&worker->atomic_tasklet);
+		return true;
+	}
+
 	p = worker->task;
 
 #ifdef CONFIG_SMP
@@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
 	lockdep_assert_held(&pool->lock);
 
 	if (!nna) {
-		/* per-cpu workqueue, pwq->nr_active is sufficient */
-		obtained = pwq->nr_active < READ_ONCE(wq->max_active);
+		/*
+		 * An atomic workqueue always have a single worker per-cpu and
+		 * doesn't impose additional max_active limit. For a per-cpu
+		 * workqueue, checking pwq->nr_active is sufficient.
+		 */
+		if (wq->flags & WQ_ATOMIC)
+			obtained = true;
+		else
+			obtained = pwq->nr_active < READ_ONCE(wq->max_active);
 		goto out;
 	}
 
@@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
 
 	worker->id = id;
 
-	if (pool->cpu >= 0)
-		snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
-			 pool->attrs->nice < 0  ? "H" : "");
-	else
-		snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
-	worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
-					      "kworker/%s", id_buf);
-	if (IS_ERR(worker->task)) {
-		if (PTR_ERR(worker->task) == -EINTR) {
-			pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
-			       id_buf);
-		} else {
-			pr_err_once("workqueue: Failed to create a worker thread: %pe",
-				    worker->task);
+	if (pool->flags & POOL_ATOMIC) {
+		tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
+	} else {
+		if (pool->cpu >= 0)
+			snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
+				 pool->attrs->nice < 0  ? "H" : "");
+		else
+			snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
+
+		worker->task = kthread_create_on_node(worker_thread, worker,
+					pool->node, "kworker/%s", id_buf);
+		if (IS_ERR(worker->task)) {
+			if (PTR_ERR(worker->task) == -EINTR) {
+				pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
+				       id_buf);
+			} else {
+				pr_err_once("workqueue: Failed to create a worker thread: %pe",
+					    worker->task);
+			}
+			goto fail;
 		}
-		goto fail;
-	}
 
-	set_user_nice(worker->task, pool->attrs->nice);
-	kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+		set_user_nice(worker->task, pool->attrs->nice);
+		kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+	}
 
 	/* successful, attach the worker to the pool */
 	worker_attach_to_pool(worker, pool);
@@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
 	 * check if not woken up soon. As kick_pool() is noop if @pool is empty,
 	 * wake it up explicitly.
 	 */
-	wake_up_process(worker->task);
+	if (worker->task)
+		wake_up_process(worker->task);
 
 	raw_spin_unlock_irq(&pool->lock);
 
@@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
 	lock_map_release(&lockdep_map);
 	lock_map_release(&pwq->wq->lockdep_map);
 
-	if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
-		     rcu_preempt_depth() > 0)) {
-		pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
-		       "     last function: %ps\n",
-		       current->comm, preempt_count(), rcu_preempt_depth(),
-		       task_pid_nr(current), worker->current_func);
-		debug_show_held_locks(current);
-		dump_stack();
-	}
+	if (worker->task) {
+		if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
+			     rcu_preempt_depth() > 0)) {
+			pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
+			       "     last function: %ps\n",
+			       current->comm, preempt_count(),
+			       rcu_preempt_depth(), task_pid_nr(current),
+			       worker->current_func);
+			debug_show_held_locks(current);
+			dump_stack();
+		}
 
-	/*
-	 * The following prevents a kworker from hogging CPU on !PREEMPTION
-	 * kernels, where a requeueing work item waiting for something to
-	 * happen could deadlock with stop_machine as such work item could
-	 * indefinitely requeue itself while all other CPUs are trapped in
-	 * stop_machine. At the same time, report a quiescent RCU state so
-	 * the same condition doesn't freeze RCU.
-	 */
-	cond_resched();
+		/*
+		 * The following prevents a kworker from hogging CPU on
+		 * !PREEMPTION kernels, where a requeueing work item waiting for
+		 * something to happen could deadlock with stop_machine as such
+		 * work item could indefinitely requeue itself while all other
+		 * CPUs are trapped in stop_machine. At the same time, report a
+		 * quiescent RCU state so the same condition doesn't freeze RCU.
+		 */
+		if (worker->task)
+			cond_resched();
+	} else {
+		if (unlikely(lockdep_depth(current) > 0)) {
+			pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
+			       worker->current_func);
+			debug_show_held_locks(current);
+		}
+	}
 
 	raw_spin_lock_irq(&pool->lock);
 
@@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
 	goto repeat;
 }
 
+void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
+{
+	struct worker *worker =
+		container_of(tasklet, struct worker, atomic_tasklet);
+	struct worker_pool *pool = worker->pool;
+	int nr_restarts = ATOMIC_WORKER_RESTARTS;
+	unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
+
+	raw_spin_lock_irq(&pool->lock);
+	worker_leave_idle(worker);
+
+	/*
+	 * This function follows the structure of worker_thread(). See there for
+	 * explanations on each step.
+	 */
+	if (need_more_worker(pool))
+		goto done;
+
+	WARN_ON_ONCE(!list_empty(&worker->scheduled));
+	worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
+
+	do {
+		struct work_struct *work =
+			list_first_entry(&pool->worklist,
+					 struct work_struct, entry);
+
+		if (assign_work(work, worker, NULL))
+			process_scheduled_works(worker);
+	} while (--nr_restarts && time_before(jiffies, end) &&
+		 keep_working(pool));
+
+	worker_set_flags(worker, WORKER_PREP);
+done:
+	worker_enter_idle(worker);
+	kick_pool(pool);
+	raw_spin_unlock_irq(&pool->lock);
+}
+
 /**
  * check_flush_dependency - check for flush dependency sanity
  * @target_wq: workqueue being flushed
@@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
 	size_t wq_size;
 	int name_len;
 
+	if (flags & WQ_ATOMIC) {
+		if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
+			return NULL;
+		if (WARN_ON_ONCE(max_active))
+			return NULL;
+	}
+
 	/*
 	 * Unbound && max_active == 1 used to imply ordered, which is no longer
 	 * the case on many machines due to per-pod pools. While
@@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
 	cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
 }
 
+static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
+{
+	BUG_ON(init_worker_pool(pool));
+	pool->cpu = cpu;
+	cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+	cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
+	pool->attrs->nice = nice;
+	pool->attrs->affn_strict = true;
+	pool->node = cpu_to_node(cpu);
+
+	/* alloc pool ID */
+	mutex_lock(&wq_pool_mutex);
+	BUG_ON(worker_pool_assign_id(pool));
+	mutex_unlock(&wq_pool_mutex);
+}
+
 /**
  * workqueue_init_early - early init for workqueue subsystem
  *
@@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
 	pt->pod_node[0] = NUMA_NO_NODE;
 	pt->cpu_pod[0] = 0;
 
-	/* initialize CPU pools */
+	/* initialize atomic and CPU pools */
 	for_each_possible_cpu(cpu) {
 		struct worker_pool *pool;
 
 		i = 0;
-		for_each_cpu_worker_pool(pool, cpu) {
-			BUG_ON(init_worker_pool(pool));
-			pool->cpu = cpu;
-			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
-			cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
-			pool->attrs->nice = std_nice[i++];
-			pool->attrs->affn_strict = true;
-			pool->node = cpu_to_node(cpu);
-
-			/* alloc pool ID */
-			mutex_lock(&wq_pool_mutex);
-			BUG_ON(worker_pool_assign_id(pool));
-			mutex_unlock(&wq_pool_mutex);
+		for_each_atomic_worker_pool(pool, cpu) {
+			init_cpu_worker_pool(pool, cpu, std_nice[i++]);
+			pool->flags |= POOL_ATOMIC;
 		}
+
+		i = 0;
+		for_each_cpu_worker_pool(pool, cpu)
+			init_cpu_worker_pool(pool, cpu, std_nice[i++]);
 	}
 
 	/* create default unbound and ordered wq attrs */
@@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
 	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
 					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
 					      0);
+	system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
+	system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
+						   WQ_ATOMIC | WQ_HIGHPRI, 0);
 	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
 	       !system_unbound_wq || !system_freezable_wq ||
 	       !system_power_efficient_wq ||
-	       !system_freezable_power_efficient_wq);
+	       !system_freezable_power_efficient_wq ||
+	       !system_atomic_wq || !system_atomic_highpri_wq);
 }
 
 static void __init wq_cpu_intensive_thresh_init(void)
@@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
 	 * up. Also, create a rescuer for workqueues that requested it.
 	 */
 	for_each_possible_cpu(cpu) {
-		for_each_cpu_worker_pool(pool, cpu) {
+		for_each_atomic_worker_pool(pool, cpu)
+			pool->node = cpu_to_node(cpu);
+		for_each_cpu_worker_pool(pool, cpu)
 			pool->node = cpu_to_node(cpu);
-		}
 	}
 
 	list_for_each_entry(wq, &workqueues, list) {
@@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
 
 	/* create the initial workers */
 	for_each_online_cpu(cpu) {
+		for_each_atomic_worker_pool(pool, cpu)
+			BUG_ON(!create_worker(pool));
 		for_each_cpu_worker_pool(pool, cpu) {
 			pool->flags &= ~POOL_DISASSOCIATED;
 			BUG_ON(!create_worker(pool));
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index f6275944ada7..f65f204f38ea 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -10,6 +10,7 @@
 
 #include <linux/workqueue.h>
 #include <linux/kthread.h>
+#include <linux/interrupt.h>
 #include <linux/preempt.h>
 
 struct worker_pool;
@@ -42,6 +43,8 @@ struct worker {
 	struct list_head	scheduled;	/* L: scheduled works */
 
 	struct task_struct	*task;		/* I: worker task */
+	struct tasklet_struct	atomic_tasklet;	/* I: tasklet for atomic pool */
+
 	struct worker_pool	*pool;		/* A: the associated pool */
 						/* L: for rescuers */
 	struct list_head	node;		/* A: anchored at pool->workers */
Allen Jan. 27, 2024, 3:13 a.m. UTC | #10
>
> The following is a draft patch which implements atomic workqueues and
> convert dm-crypt to use it instead of tasklet. It's an early draft and very
> lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
> pending patchset. The following git branch can be used for testing.
>
>   git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft
>
> I'll go over it to make sure all the pieces work. While it adds some
> complications, it doesn't seem too bad and conversion from tasklet should be
> straightforward too.
>
> - It hooks into tasklet[_hi] for now but if we get to update all of tasklet
>   users, we can just repurpose the tasklet softirq slots directly.
>
> - I thought about allowing busy-waits for flushes and cancels but it didn't
>   seem necessary. Keeping them blocking has the benefit of avoiding possible
>   nasty deadlocks. We can revisit if there's need.
>
> - Compared to tasklet, each work item goes through a bit more management
>   code because I wanted to keep the code as unified as possible to regular
>   threaded workqueues. That said, it's not a huge amount and my bet is that
>   the difference is unlikely to be noticeable.
>

Tejun,

  Thank you very much. I really like the approach you have taken in
this patchset.
My design/thought was very similar to the idea you have, tocreate a
new workqueue specifically for bottom half execution.
I missed this bit,   WQ_ATOMIC | WQ_HIGHPRI and many other cases that
you have covered in your patch.

  Here's a partial diff of the idea:

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 2cc0a9606175..3eaa80826b8d 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -348,6 +348,7 @@ static inline unsigned int work_static(struct
work_struct *work) { return 0; }
  * Documentation/core-api/workqueue.rst.
  */
 enum {
+       WQ_IRQ                  = 1 << 0, /* for bottom half */
        WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
        WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
        WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
@@ -426,6 +427,7 @@ extern struct workqueue_struct *system_highpri_wq;
 extern struct workqueue_struct *system_long_wq;
 extern struct workqueue_struct *system_unbound_wq;
 extern struct workqueue_struct *system_freezable_wq;
+extern struct workqueue_struct *system_irq_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 76e60faed892..64db4fc0d0c7 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -436,6 +436,8 @@ struct workqueue_struct *system_unbound_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_unbound_wq);
 struct workqueue_struct *system_freezable_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_wq);
+struct workqueue_struct *system_irq_wq __ro_after_init;
+EXPORT_SYMBOL_GPL(irq_bh_wq);
 struct workqueue_struct *system_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
@@ -6689,13 +6691,15 @@ void __init workqueue_init_early(void)
                                            WQ_MAX_ACTIVE);
        system_freezable_wq = alloc_workqueue("events_freezable",
                                              WQ_FREEZABLE, 0);
+       system_irq_wq = alloc_workqueue("events_irq",
+                                       WQ_IRQ, 0);
        system_power_efficient_wq = alloc_workqueue("events_power_efficient",
                                              WQ_POWER_EFFICIENT, 0);
        system_freezable_power_efficient_wq =
alloc_workqueue("events_freezable_power_efficient",
                                              WQ_FREEZABLE | WQ_POWER_EFFICIENT,
                                              0);
        BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
-              !system_unbound_wq || !system_freezable_wq ||
+              !system_unbound_wq || !system_freezable_wq || !system_irq_wq ||
               !system_power_efficient_wq ||
               !system_freezable_power_efficient_wq);


 This patchset makes a lot more sense to use as a base to list of
patches that I have that move away
from tasklets. I will rebase my work on top of your branch and test it.

Thanks.

> Thanks.
>
> From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
> From: Tejun Heo <tj@kernel.org>
> Date: Fri, 26 Jan 2024 13:21:42 -1000
> Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
>  dmcrypt to use it
>
> ---
>  drivers/md/dm-crypt.c       |  36 +-----
>  include/linux/workqueue.h   |   6 +
>  kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
>  kernel/workqueue_internal.h |   3 +
>  4 files changed, 186 insertions(+), 93 deletions(-)
>
> diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
> index 855b482cbff1..d375285db202 100644
> --- a/drivers/md/dm-crypt.c
> +++ b/drivers/md/dm-crypt.c
> @@ -73,11 +73,8 @@ struct dm_crypt_io {
>         struct bio *base_bio;
>         u8 *integrity_metadata;
>         bool integrity_metadata_from_pool:1;
> -       bool in_tasklet:1;
>
>         struct work_struct work;
> -       struct tasklet_struct tasklet;
> -
>         struct convert_context ctx;
>
>         atomic_t io_pending;
> @@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
>         io->ctx.r.req = NULL;
>         io->integrity_metadata = NULL;
>         io->integrity_metadata_from_pool = false;
> -       io->in_tasklet = false;
>         atomic_set(&io->io_pending, 0);
>  }
>
> @@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
>         atomic_inc(&io->io_pending);
>  }
>
> -static void kcryptd_io_bio_endio(struct work_struct *work)
> -{
> -       struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
> -
> -       bio_endio(io->base_bio);
> -}
> -
>  /*
>   * One of the bios was finished. Check for completion of
>   * the whole request and correctly clean up the buffer.
> @@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
>                 kfree(io->integrity_metadata);
>
>         base_bio->bi_status = error;
> -
> -       /*
> -        * If we are running this function from our tasklet,
> -        * we can't call bio_endio() here, because it will call
> -        * clone_endio() from dm.c, which in turn will
> -        * free the current struct dm_crypt_io structure with
> -        * our tasklet. In this case we need to delay bio_endio()
> -        * execution to after the tasklet is done and dequeued.
> -        */
> -       if (io->in_tasklet) {
> -               INIT_WORK(&io->work, kcryptd_io_bio_endio);
> -               queue_work(cc->io_queue, &io->work);
> -               return;
> -       }
> -
>         bio_endio(base_bio);
>  }
>
> @@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
>                 kcryptd_crypt_write_convert(io);
>  }
>
> -static void kcryptd_crypt_tasklet(unsigned long work)
> -{
> -       kcryptd_crypt((struct work_struct *)work);
> -}
> -
>  static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>  {
>         struct crypt_config *cc = io->cc;
> @@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>                  * it is being executed with irqs disabled.
>                  */
>                 if (in_hardirq() || irqs_disabled()) {
> -                       io->in_tasklet = true;
> -                       tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
> -                       tasklet_schedule(&io->tasklet);
> +                       INIT_WORK(&io->work, kcryptd_crypt);
> +                       queue_work(system_atomic_wq, &io->work);
>                         return;
>                 }
>
> diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
> index 232baea90a1d..1e4938b5b176 100644
> --- a/include/linux/workqueue.h
> +++ b/include/linux/workqueue.h
> @@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
>   * Documentation/core-api/workqueue.rst.
>   */
>  enum wq_flags {
> +       WQ_ATOMIC               = 1 << 0, /* execute in softirq context */
>         WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
>         WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
>         WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
> @@ -392,6 +393,9 @@ enum wq_flags {
>         __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
>         __WQ_LEGACY             = 1 << 18, /* internal: create*_workqueue() */
>         __WQ_ORDERED_EXPLICIT   = 1 << 19, /* internal: alloc_ordered_workqueue() */
> +
> +       /* atomic wq only allows the following flags */
> +       __WQ_ATOMIC_ALLOWS      = WQ_ATOMIC | WQ_HIGHPRI,
>  };
>
>  enum wq_consts {
> @@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
>  extern struct workqueue_struct *system_freezable_wq;
>  extern struct workqueue_struct *system_power_efficient_wq;
>  extern struct workqueue_struct *system_freezable_power_efficient_wq;
> +extern struct workqueue_struct *system_atomic_wq;
> +extern struct workqueue_struct *system_atomic_highpri_wq;
>
>  /**
>   * alloc_workqueue - allocate a workqueue
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index 23740c9ed57a..2a8f21494676 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -73,7 +73,8 @@ enum worker_pool_flags {
>          * wq_pool_attach_mutex to avoid changing binding state while
>          * worker_attach_to_pool() is in progress.
>          */
> -       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
> +       POOL_ATOMIC             = 1 << 0,       /* is an atomic pool */
> +       POOL_MANAGER_ACTIVE     = 1 << 1,       /* being managed */
>         POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
>  };
>
> @@ -115,6 +116,14 @@ enum wq_internal_consts {
>         WQ_NAME_LEN             = 32,
>  };
>
> +/*
> + * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
> + * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
> + * msecs_to_jiffies() can't be an initializer.
> + */
> +#define ATOMIC_WORKER_JIFFIES  msecs_to_jiffies(2)
> +#define ATOMIC_WORKER_RESTARTS 10
> +
>  /*
>   * Structure fields follow one of the following exclusion rules.
>   *
> @@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
>  #endif
>  module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
>
> +/* the atomic worker pools */
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    atomic_worker_pools);
> +
>  /* the per-cpu worker pools */
> -static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    cpu_worker_pools);
>
>  static DEFINE_IDR(worker_pool_idr);    /* PR: idr of all pools */
>
> @@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_power_efficient_wq);
>  struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
> +struct workqueue_struct *system_atomic_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_wq);
> +struct workqueue_struct *system_atomic_highpri_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
>
>  static int worker_thread(void *__worker);
> +static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
>  static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
>  static void show_pwq(struct pool_workqueue *pwq);
>  static void show_one_worker_pool(struct worker_pool *pool);
> @@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
>                          !lockdep_is_held(&wq_pool_mutex),              \
>                          "RCU, wq->mutex or wq_pool_mutex should be held")
>
> +#define for_each_atomic_worker_pool(pool, cpu)                         \
> +       for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];            \
> +            (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> +            (pool)++)
> +
>  #define for_each_cpu_worker_pool(pool, cpu)                            \
>         for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];               \
>              (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> @@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
>         if (!need_more_worker(pool) || !worker)
>                 return false;
>
> +       if (pool->flags & POOL_ATOMIC) {
> +               if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
> +                       tasklet_hi_schedule(&worker->atomic_tasklet);
> +               else
> +                       tasklet_schedule(&worker->atomic_tasklet);
> +               return true;
> +       }
> +
>         p = worker->task;
>
>  #ifdef CONFIG_SMP
> @@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
>         lockdep_assert_held(&pool->lock);
>
>         if (!nna) {
> -               /* per-cpu workqueue, pwq->nr_active is sufficient */
> -               obtained = pwq->nr_active < READ_ONCE(wq->max_active);
> +               /*
> +                * An atomic workqueue always have a single worker per-cpu and
> +                * doesn't impose additional max_active limit. For a per-cpu
> +                * workqueue, checking pwq->nr_active is sufficient.
> +                */
> +               if (wq->flags & WQ_ATOMIC)
> +                       obtained = true;
> +               else
> +                       obtained = pwq->nr_active < READ_ONCE(wq->max_active);
>                 goto out;
>         }
>
> @@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
>
>         worker->id = id;
>
> -       if (pool->cpu >= 0)
> -               snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> -                        pool->attrs->nice < 0  ? "H" : "");
> -       else
> -               snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> -
> -       worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
> -                                             "kworker/%s", id_buf);
> -       if (IS_ERR(worker->task)) {
> -               if (PTR_ERR(worker->task) == -EINTR) {
> -                       pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> -                              id_buf);
> -               } else {
> -                       pr_err_once("workqueue: Failed to create a worker thread: %pe",
> -                                   worker->task);
> +       if (pool->flags & POOL_ATOMIC) {
> +               tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
> +       } else {
> +               if (pool->cpu >= 0)
> +                       snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> +                                pool->attrs->nice < 0  ? "H" : "");
> +               else
> +                       snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> +
> +               worker->task = kthread_create_on_node(worker_thread, worker,
> +                                       pool->node, "kworker/%s", id_buf);
> +               if (IS_ERR(worker->task)) {
> +                       if (PTR_ERR(worker->task) == -EINTR) {
> +                               pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> +                                      id_buf);
> +                       } else {
> +                               pr_err_once("workqueue: Failed to create a worker thread: %pe",
> +                                           worker->task);
> +                       }
> +                       goto fail;
>                 }
> -               goto fail;
> -       }
>
> -       set_user_nice(worker->task, pool->attrs->nice);
> -       kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +               set_user_nice(worker->task, pool->attrs->nice);
> +               kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +       }
>
>         /* successful, attach the worker to the pool */
>         worker_attach_to_pool(worker, pool);
> @@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
>          * check if not woken up soon. As kick_pool() is noop if @pool is empty,
>          * wake it up explicitly.
>          */
> -       wake_up_process(worker->task);
> +       if (worker->task)
> +               wake_up_process(worker->task);
>
>         raw_spin_unlock_irq(&pool->lock);
>
> @@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
>         lock_map_release(&lockdep_map);
>         lock_map_release(&pwq->wq->lockdep_map);
>
> -       if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> -                    rcu_preempt_depth() > 0)) {
> -               pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> -                      "     last function: %ps\n",
> -                      current->comm, preempt_count(), rcu_preempt_depth(),
> -                      task_pid_nr(current), worker->current_func);
> -               debug_show_held_locks(current);
> -               dump_stack();
> -       }
> +       if (worker->task) {
> +               if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> +                            rcu_preempt_depth() > 0)) {
> +                       pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> +                              "     last function: %ps\n",
> +                              current->comm, preempt_count(),
> +                              rcu_preempt_depth(), task_pid_nr(current),
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +                       dump_stack();
> +               }
>
> -       /*
> -        * The following prevents a kworker from hogging CPU on !PREEMPTION
> -        * kernels, where a requeueing work item waiting for something to
> -        * happen could deadlock with stop_machine as such work item could
> -        * indefinitely requeue itself while all other CPUs are trapped in
> -        * stop_machine. At the same time, report a quiescent RCU state so
> -        * the same condition doesn't freeze RCU.
> -        */
> -       cond_resched();
> +               /*
> +                * The following prevents a kworker from hogging CPU on
> +                * !PREEMPTION kernels, where a requeueing work item waiting for
> +                * something to happen could deadlock with stop_machine as such
> +                * work item could indefinitely requeue itself while all other
> +                * CPUs are trapped in stop_machine. At the same time, report a
> +                * quiescent RCU state so the same condition doesn't freeze RCU.
> +                */
> +               if (worker->task)
> +                       cond_resched();
> +       } else {
> +               if (unlikely(lockdep_depth(current) > 0)) {
> +                       pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +               }
> +       }
>
>         raw_spin_lock_irq(&pool->lock);
>
> @@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
>         goto repeat;
>  }
>
> +void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
> +{
> +       struct worker *worker =
> +               container_of(tasklet, struct worker, atomic_tasklet);
> +       struct worker_pool *pool = worker->pool;
> +       int nr_restarts = ATOMIC_WORKER_RESTARTS;
> +       unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
> +
> +       raw_spin_lock_irq(&pool->lock);
> +       worker_leave_idle(worker);
> +
> +       /*
> +        * This function follows the structure of worker_thread(). See there for
> +        * explanations on each step.
> +        */
> +       if (need_more_worker(pool))
> +               goto done;
> +
> +       WARN_ON_ONCE(!list_empty(&worker->scheduled));
> +       worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
> +
> +       do {
> +               struct work_struct *work =
> +                       list_first_entry(&pool->worklist,
> +                                        struct work_struct, entry);
> +
> +               if (assign_work(work, worker, NULL))
> +                       process_scheduled_works(worker);
> +       } while (--nr_restarts && time_before(jiffies, end) &&
> +                keep_working(pool));
> +
> +       worker_set_flags(worker, WORKER_PREP);
> +done:
> +       worker_enter_idle(worker);
> +       kick_pool(pool);
> +       raw_spin_unlock_irq(&pool->lock);
> +}
> +
>  /**
>   * check_flush_dependency - check for flush dependency sanity
>   * @target_wq: workqueue being flushed
> @@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
>         size_t wq_size;
>         int name_len;
>
> +       if (flags & WQ_ATOMIC) {
> +               if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
> +                       return NULL;
> +               if (WARN_ON_ONCE(max_active))
> +                       return NULL;
> +       }
> +
>         /*
>          * Unbound && max_active == 1 used to imply ordered, which is no longer
>          * the case on many machines due to per-pod pools. While
> @@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
>         cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
>  }
>
> +static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
> +{
> +       BUG_ON(init_worker_pool(pool));
> +       pool->cpu = cpu;
> +       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> +       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> +       pool->attrs->nice = nice;
> +       pool->attrs->affn_strict = true;
> +       pool->node = cpu_to_node(cpu);
> +
> +       /* alloc pool ID */
> +       mutex_lock(&wq_pool_mutex);
> +       BUG_ON(worker_pool_assign_id(pool));
> +       mutex_unlock(&wq_pool_mutex);
> +}
> +
>  /**
>   * workqueue_init_early - early init for workqueue subsystem
>   *
> @@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
>         pt->pod_node[0] = NUMA_NO_NODE;
>         pt->cpu_pod[0] = 0;
>
> -       /* initialize CPU pools */
> +       /* initialize atomic and CPU pools */
>         for_each_possible_cpu(cpu) {
>                 struct worker_pool *pool;
>
>                 i = 0;
> -               for_each_cpu_worker_pool(pool, cpu) {
> -                       BUG_ON(init_worker_pool(pool));
> -                       pool->cpu = cpu;
> -                       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> -                       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> -                       pool->attrs->nice = std_nice[i++];
> -                       pool->attrs->affn_strict = true;
> -                       pool->node = cpu_to_node(cpu);
> -
> -                       /* alloc pool ID */
> -                       mutex_lock(&wq_pool_mutex);
> -                       BUG_ON(worker_pool_assign_id(pool));
> -                       mutex_unlock(&wq_pool_mutex);
> +               for_each_atomic_worker_pool(pool, cpu) {
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
> +                       pool->flags |= POOL_ATOMIC;
>                 }
> +
> +               i = 0;
> +               for_each_cpu_worker_pool(pool, cpu)
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
>         }
>
>         /* create default unbound and ordered wq attrs */
> @@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
>         system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
>                                               WQ_FREEZABLE | WQ_POWER_EFFICIENT,
>                                               0);
> +       system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
> +       system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
> +                                                  WQ_ATOMIC | WQ_HIGHPRI, 0);
>         BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
>                !system_unbound_wq || !system_freezable_wq ||
>                !system_power_efficient_wq ||
> -              !system_freezable_power_efficient_wq);
> +              !system_freezable_power_efficient_wq ||
> +              !system_atomic_wq || !system_atomic_highpri_wq);
>  }
>
>  static void __init wq_cpu_intensive_thresh_init(void)
> @@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
>          * up. Also, create a rescuer for workqueues that requested it.
>          */
>         for_each_possible_cpu(cpu) {
> -               for_each_cpu_worker_pool(pool, cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       pool->node = cpu_to_node(cpu);
> +               for_each_cpu_worker_pool(pool, cpu)
>                         pool->node = cpu_to_node(cpu);
> -               }
>         }
>
>         list_for_each_entry(wq, &workqueues, list) {
> @@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
>
>         /* create the initial workers */
>         for_each_online_cpu(cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       BUG_ON(!create_worker(pool));
>                 for_each_cpu_worker_pool(pool, cpu) {
>                         pool->flags &= ~POOL_DISASSOCIATED;
>                         BUG_ON(!create_worker(pool));
> diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
> index f6275944ada7..f65f204f38ea 100644
> --- a/kernel/workqueue_internal.h
> +++ b/kernel/workqueue_internal.h
> @@ -10,6 +10,7 @@
>
>  #include <linux/workqueue.h>
>  #include <linux/kthread.h>
> +#include <linux/interrupt.h>
>  #include <linux/preempt.h>
>
>  struct worker_pool;
> @@ -42,6 +43,8 @@ struct worker {
>         struct list_head        scheduled;      /* L: scheduled works */
>
>         struct task_struct      *task;          /* I: worker task */
> +       struct tasklet_struct   atomic_tasklet; /* I: tasklet for atomic pool */
> +
>         struct worker_pool      *pool;          /* A: the associated pool */
>                                                 /* L: for rescuers */
>         struct list_head        node;           /* A: anchored at pool->workers */
> --
> 2.43.0
>
>
Tejun Heo Jan. 27, 2024, 6:37 p.m. UTC | #11
On Fri, Jan 26, 2024 at 01:43:25PM -1000, Tejun Heo wrote:
> Hello,
> 
> The following is a draft patch which implements atomic workqueues and
> convert dm-crypt to use it instead of tasklet. It's an early draft and very
> lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
> pending patchset. The following git branch can be used for testing.
> 
>   git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft
> 
> I'll go over it to make sure all the pieces work. While it adds some
> complications, it doesn't seem too bad and conversion from tasklet should be
> straightforward too.
> 
> - It hooks into tasklet[_hi] for now but if we get to update all of tasklet
>   users, we can just repurpose the tasklet softirq slots directly.
> 
> - I thought about allowing busy-waits for flushes and cancels but it didn't
>   seem necessary. Keeping them blocking has the benefit of avoiding possible
>   nasty deadlocks. We can revisit if there's need.
> 
> - Compared to tasklet, each work item goes through a bit more management
>   code because I wanted to keep the code as unified as possible to regular
>   threaded workqueues. That said, it's not a huge amount and my bet is that
>   the difference is unlikely to be noticeable.

Should have known when it worked too well on the first try but I missed a
part in init and this was just running them on per-cpu workqueues. Will post
an actually working version later.

Thanks.
Christoph Hellwig Jan. 29, 2024, 8:26 a.m. UTC | #12
On Fri, Jan 26, 2024 at 07:56:39AM +0100, Hannes Reinecke wrote:
> DM-multipath schedules based on request (if you use the request-based
> interface) or bios (if you use the bio-based interface).
> Any merge decision is typically done by the block layer when combining bios
> into requests; and you can only merge bios if the bvecs are adjacent.
> So if you use bio-based multipathing you will spread sequential bios
> across all paths, leaving the block layer unable to merge requests.

While I think the current code would do that, there is nothing inherent
in the interface forcing it to do that.  The blk_plug allows drivers
to attach callbacks to batch up plugged bios, and the md raid code makes
use of that to get larger I/O, which is rather essential for partity
RAID.

> Another thing is timeouts; bios don't do timeouts, so a bio can run
> for an arbitrary time with no chance of interrupting it.
> Requests do have a timeout, and will be aborted from the driver when
> the timeout is hit.
> Seeing that 99% of all I/O issues I've seen _are_ timeouts it becomes
> a crucial feature if you want dm-multipath to control failover time.

This does not matter - a timeout bio will return an error to the
submitter even on a request based driver unless the timeout handler
fixed it up.  And there is nothing preventing a bio based driver from
having it's own timeout.  We had quite a few in the past.  But dm-rq
doesn't even set a timeout, so for the current situation this is
100% irrelevant.

So while I'm not sure if using the existing bio based mpath code with
plugging would be enough to suite all users of dm-multipath, I'd
really love to see someone actually trying it.  Removing the request
based path would significantly simplify dm, and also remove a fair
amount of hacks from the block layer.
Allen Jan. 29, 2024, 5 p.m. UTC | #13
> The following is a draft patch which implements atomic workqueues and
> convert dm-crypt to use it instead of tasklet. It's an early draft and very
> lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
> pending patchset. The following git branch can be used for testing.
>
>   git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft
>
> I'll go over it to make sure all the pieces work. While it adds some
> complications, it doesn't seem too bad and conversion from tasklet should be
> straightforward too.
>
> - It hooks into tasklet[_hi] for now but if we get to update all of tasklet
>   users, we can just repurpose the tasklet softirq slots directly.
>
> - I thought about allowing busy-waits for flushes and cancels but it didn't
>   seem necessary. Keeping them blocking has the benefit of avoiding possible
>   nasty deadlocks. We can revisit if there's need.
>
> - Compared to tasklet, each work item goes through a bit more management
>   code because I wanted to keep the code as unified as possible to regular
>   threaded workqueues. That said, it's not a huge amount and my bet is that
>   the difference is unlikely to be noticeable.
>
> Thanks.
>
> From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
> From: Tejun Heo <tj@kernel.org>
> Date: Fri, 26 Jan 2024 13:21:42 -1000
> Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
>  dmcrypt to use it
>
> ---
>  drivers/md/dm-crypt.c       |  36 +-----
>  include/linux/workqueue.h   |   6 +
>  kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
>  kernel/workqueue_internal.h |   3 +
>  4 files changed, 186 insertions(+), 93 deletions(-)
>
> diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
> index 855b482cbff1..d375285db202 100644
> --- a/drivers/md/dm-crypt.c
> +++ b/drivers/md/dm-crypt.c
> @@ -73,11 +73,8 @@ struct dm_crypt_io {
>         struct bio *base_bio;
>         u8 *integrity_metadata;
>         bool integrity_metadata_from_pool:1;
> -       bool in_tasklet:1;
>
>         struct work_struct work;
> -       struct tasklet_struct tasklet;
> -
>         struct convert_context ctx;
>
>         atomic_t io_pending;
> @@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
>         io->ctx.r.req = NULL;
>         io->integrity_metadata = NULL;
>         io->integrity_metadata_from_pool = false;
> -       io->in_tasklet = false;
>         atomic_set(&io->io_pending, 0);
>  }
>
> @@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
>         atomic_inc(&io->io_pending);
>  }
>
> -static void kcryptd_io_bio_endio(struct work_struct *work)
> -{
> -       struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
> -
> -       bio_endio(io->base_bio);
> -}
> -
>  /*
>   * One of the bios was finished. Check for completion of
>   * the whole request and correctly clean up the buffer.
> @@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
>                 kfree(io->integrity_metadata);
>
>         base_bio->bi_status = error;
> -
> -       /*
> -        * If we are running this function from our tasklet,
> -        * we can't call bio_endio() here, because it will call
> -        * clone_endio() from dm.c, which in turn will
> -        * free the current struct dm_crypt_io structure with
> -        * our tasklet. In this case we need to delay bio_endio()
> -        * execution to after the tasklet is done and dequeued.
> -        */
> -       if (io->in_tasklet) {
> -               INIT_WORK(&io->work, kcryptd_io_bio_endio);
> -               queue_work(cc->io_queue, &io->work);
> -               return;
> -       }
> -
>         bio_endio(base_bio);
>  }
>
> @@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
>                 kcryptd_crypt_write_convert(io);
>  }
>
> -static void kcryptd_crypt_tasklet(unsigned long work)
> -{
> -       kcryptd_crypt((struct work_struct *)work);
> -}
> -
>  static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>  {
>         struct crypt_config *cc = io->cc;
> @@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>                  * it is being executed with irqs disabled.
>                  */
>                 if (in_hardirq() || irqs_disabled()) {
> -                       io->in_tasklet = true;
> -                       tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
> -                       tasklet_schedule(&io->tasklet);
> +                       INIT_WORK(&io->work, kcryptd_crypt);
> +                       queue_work(system_atomic_wq, &io->work);
>                         return;
>                 }
>
> diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
> index 232baea90a1d..1e4938b5b176 100644
> --- a/include/linux/workqueue.h
> +++ b/include/linux/workqueue.h
> @@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
>   * Documentation/core-api/workqueue.rst.
>   */
>  enum wq_flags {
> +       WQ_ATOMIC               = 1 << 0, /* execute in softirq context */
>         WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
>         WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
>         WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
> @@ -392,6 +393,9 @@ enum wq_flags {
>         __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
>         __WQ_LEGACY             = 1 << 18, /* internal: create*_workqueue() */
>         __WQ_ORDERED_EXPLICIT   = 1 << 19, /* internal: alloc_ordered_workqueue() */
> +
> +       /* atomic wq only allows the following flags */
> +       __WQ_ATOMIC_ALLOWS      = WQ_ATOMIC | WQ_HIGHPRI,
>  };
>
>  enum wq_consts {
> @@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
>  extern struct workqueue_struct *system_freezable_wq;
>  extern struct workqueue_struct *system_power_efficient_wq;
>  extern struct workqueue_struct *system_freezable_power_efficient_wq;
> +extern struct workqueue_struct *system_atomic_wq;
> +extern struct workqueue_struct *system_atomic_highpri_wq;
>
>  /**
>   * alloc_workqueue - allocate a workqueue
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index 23740c9ed57a..2a8f21494676 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -73,7 +73,8 @@ enum worker_pool_flags {
>          * wq_pool_attach_mutex to avoid changing binding state while
>          * worker_attach_to_pool() is in progress.
>          */
> -       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
> +       POOL_ATOMIC             = 1 << 0,       /* is an atomic pool */
> +       POOL_MANAGER_ACTIVE     = 1 << 1,       /* being managed */
>         POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
>  };
>
> @@ -115,6 +116,14 @@ enum wq_internal_consts {
>         WQ_NAME_LEN             = 32,
>  };
>
> +/*
> + * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
> + * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
> + * msecs_to_jiffies() can't be an initializer.
> + */
> +#define ATOMIC_WORKER_JIFFIES  msecs_to_jiffies(2)
> +#define ATOMIC_WORKER_RESTARTS 10
> +
>  /*
>   * Structure fields follow one of the following exclusion rules.
>   *
> @@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
>  #endif
>  module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
>
> +/* the atomic worker pools */
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    atomic_worker_pools);
> +
>  /* the per-cpu worker pools */
> -static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    cpu_worker_pools);
>
>  static DEFINE_IDR(worker_pool_idr);    /* PR: idr of all pools */
>
> @@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_power_efficient_wq);
>  struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
> +struct workqueue_struct *system_atomic_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_wq);
> +struct workqueue_struct *system_atomic_highpri_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
>
>  static int worker_thread(void *__worker);
> +static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
>  static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
>  static void show_pwq(struct pool_workqueue *pwq);
>  static void show_one_worker_pool(struct worker_pool *pool);
> @@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
>                          !lockdep_is_held(&wq_pool_mutex),              \
>                          "RCU, wq->mutex or wq_pool_mutex should be held")
>
> +#define for_each_atomic_worker_pool(pool, cpu)                         \
> +       for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];            \
> +            (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> +            (pool)++)
> +
>  #define for_each_cpu_worker_pool(pool, cpu)                            \
>         for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];               \
>              (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> @@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
>         if (!need_more_worker(pool) || !worker)
>                 return false;
>
> +       if (pool->flags & POOL_ATOMIC) {
> +               if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
> +                       tasklet_hi_schedule(&worker->atomic_tasklet);
> +               else
> +                       tasklet_schedule(&worker->atomic_tasklet);
> +               return true;
> +       }
> +
>         p = worker->task;

Tejun,

    I rushed to reply to the draft patch you sent, I should have
looked harder. My apologies.
The idea that I have been working on is to completely move away from
using tasklets.
Essentially, "get rid of tasklets entirely in the kernel". So, the use
of tasklet_schedule() & tasklet_hi_schedule()
will have to go.

   I have a very hacky draft that is still wip. I am going to borrow
many bits from your patch which makes
the work I have better.

  Perhaps we should start a separate thread, thoughts?

Thanks.


>
>  #ifdef CONFIG_SMP
> @@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
>         lockdep_assert_held(&pool->lock);
>
>         if (!nna) {
> -               /* per-cpu workqueue, pwq->nr_active is sufficient */
> -               obtained = pwq->nr_active < READ_ONCE(wq->max_active);
> +               /*
> +                * An atomic workqueue always have a single worker per-cpu and
> +                * doesn't impose additional max_active limit. For a per-cpu
> +                * workqueue, checking pwq->nr_active is sufficient.
> +                */
> +               if (wq->flags & WQ_ATOMIC)
> +                       obtained = true;
> +               else
> +                       obtained = pwq->nr_active < READ_ONCE(wq->max_active);
>                 goto out;
>         }
>
> @@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
>
>         worker->id = id;
>
> -       if (pool->cpu >= 0)
> -               snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> -                        pool->attrs->nice < 0  ? "H" : "");
> -       else
> -               snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> -
> -       worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
> -                                             "kworker/%s", id_buf);
> -       if (IS_ERR(worker->task)) {
> -               if (PTR_ERR(worker->task) == -EINTR) {
> -                       pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> -                              id_buf);
> -               } else {
> -                       pr_err_once("workqueue: Failed to create a worker thread: %pe",
> -                                   worker->task);
> +       if (pool->flags & POOL_ATOMIC) {
> +               tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
> +       } else {
> +               if (pool->cpu >= 0)
> +                       snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> +                                pool->attrs->nice < 0  ? "H" : "");
> +               else
> +                       snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> +
> +               worker->task = kthread_create_on_node(worker_thread, worker,
> +                                       pool->node, "kworker/%s", id_buf);
> +               if (IS_ERR(worker->task)) {
> +                       if (PTR_ERR(worker->task) == -EINTR) {
> +                               pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> +                                      id_buf);
> +                       } else {
> +                               pr_err_once("workqueue: Failed to create a worker thread: %pe",
> +                                           worker->task);
> +                       }
> +                       goto fail;
>                 }
> -               goto fail;
> -       }
>
> -       set_user_nice(worker->task, pool->attrs->nice);
> -       kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +               set_user_nice(worker->task, pool->attrs->nice);
> +               kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +       }
>
>         /* successful, attach the worker to the pool */
>         worker_attach_to_pool(worker, pool);
> @@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
>          * check if not woken up soon. As kick_pool() is noop if @pool is empty,
>          * wake it up explicitly.
>          */
> -       wake_up_process(worker->task);
> +       if (worker->task)
> +               wake_up_process(worker->task);
>
>         raw_spin_unlock_irq(&pool->lock);
>
> @@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
>         lock_map_release(&lockdep_map);
>         lock_map_release(&pwq->wq->lockdep_map);
>
> -       if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> -                    rcu_preempt_depth() > 0)) {
> -               pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> -                      "     last function: %ps\n",
> -                      current->comm, preempt_count(), rcu_preempt_depth(),
> -                      task_pid_nr(current), worker->current_func);
> -               debug_show_held_locks(current);
> -               dump_stack();
> -       }
> +       if (worker->task) {
> +               if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> +                            rcu_preempt_depth() > 0)) {
> +                       pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> +                              "     last function: %ps\n",
> +                              current->comm, preempt_count(),
> +                              rcu_preempt_depth(), task_pid_nr(current),
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +                       dump_stack();
> +               }
>
> -       /*
> -        * The following prevents a kworker from hogging CPU on !PREEMPTION
> -        * kernels, where a requeueing work item waiting for something to
> -        * happen could deadlock with stop_machine as such work item could
> -        * indefinitely requeue itself while all other CPUs are trapped in
> -        * stop_machine. At the same time, report a quiescent RCU state so
> -        * the same condition doesn't freeze RCU.
> -        */
> -       cond_resched();
> +               /*
> +                * The following prevents a kworker from hogging CPU on
> +                * !PREEMPTION kernels, where a requeueing work item waiting for
> +                * something to happen could deadlock with stop_machine as such
> +                * work item could indefinitely requeue itself while all other
> +                * CPUs are trapped in stop_machine. At the same time, report a
> +                * quiescent RCU state so the same condition doesn't freeze RCU.
> +                */
> +               if (worker->task)
> +                       cond_resched();
> +       } else {
> +               if (unlikely(lockdep_depth(current) > 0)) {
> +                       pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +               }
> +       }
>
>         raw_spin_lock_irq(&pool->lock);
>
> @@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
>         goto repeat;
>  }
>
> +void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
> +{
> +       struct worker *worker =
> +               container_of(tasklet, struct worker, atomic_tasklet);
> +       struct worker_pool *pool = worker->pool;
> +       int nr_restarts = ATOMIC_WORKER_RESTARTS;
> +       unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
> +
> +       raw_spin_lock_irq(&pool->lock);
> +       worker_leave_idle(worker);
> +
> +       /*
> +        * This function follows the structure of worker_thread(). See there for
> +        * explanations on each step.
> +        */
> +       if (need_more_worker(pool))
> +               goto done;
> +
> +       WARN_ON_ONCE(!list_empty(&worker->scheduled));
> +       worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
> +
> +       do {
> +               struct work_struct *work =
> +                       list_first_entry(&pool->worklist,
> +                                        struct work_struct, entry);
> +
> +               if (assign_work(work, worker, NULL))
> +                       process_scheduled_works(worker);
> +       } while (--nr_restarts && time_before(jiffies, end) &&
> +                keep_working(pool));
> +
> +       worker_set_flags(worker, WORKER_PREP);
> +done:
> +       worker_enter_idle(worker);
> +       kick_pool(pool);
> +       raw_spin_unlock_irq(&pool->lock);
> +}
> +
>  /**
>   * check_flush_dependency - check for flush dependency sanity
>   * @target_wq: workqueue being flushed
> @@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
>         size_t wq_size;
>         int name_len;
>
> +       if (flags & WQ_ATOMIC) {
> +               if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
> +                       return NULL;
> +               if (WARN_ON_ONCE(max_active))
> +                       return NULL;
> +       }
> +
>         /*
>          * Unbound && max_active == 1 used to imply ordered, which is no longer
>          * the case on many machines due to per-pod pools. While
> @@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
>         cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
>  }
>
> +static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
> +{
> +       BUG_ON(init_worker_pool(pool));
> +       pool->cpu = cpu;
> +       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> +       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> +       pool->attrs->nice = nice;
> +       pool->attrs->affn_strict = true;
> +       pool->node = cpu_to_node(cpu);
> +
> +       /* alloc pool ID */
> +       mutex_lock(&wq_pool_mutex);
> +       BUG_ON(worker_pool_assign_id(pool));
> +       mutex_unlock(&wq_pool_mutex);
> +}
> +
>  /**
>   * workqueue_init_early - early init for workqueue subsystem
>   *
> @@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
>         pt->pod_node[0] = NUMA_NO_NODE;
>         pt->cpu_pod[0] = 0;
>
> -       /* initialize CPU pools */
> +       /* initialize atomic and CPU pools */
>         for_each_possible_cpu(cpu) {
>                 struct worker_pool *pool;
>
>                 i = 0;
> -               for_each_cpu_worker_pool(pool, cpu) {
> -                       BUG_ON(init_worker_pool(pool));
> -                       pool->cpu = cpu;
> -                       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> -                       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> -                       pool->attrs->nice = std_nice[i++];
> -                       pool->attrs->affn_strict = true;
> -                       pool->node = cpu_to_node(cpu);
> -
> -                       /* alloc pool ID */
> -                       mutex_lock(&wq_pool_mutex);
> -                       BUG_ON(worker_pool_assign_id(pool));
> -                       mutex_unlock(&wq_pool_mutex);
> +               for_each_atomic_worker_pool(pool, cpu) {
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
> +                       pool->flags |= POOL_ATOMIC;
>                 }
> +
> +               i = 0;
> +               for_each_cpu_worker_pool(pool, cpu)
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
>         }
>
>         /* create default unbound and ordered wq attrs */
> @@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
>         system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
>                                               WQ_FREEZABLE | WQ_POWER_EFFICIENT,
>                                               0);
> +       system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
> +       system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
> +                                                  WQ_ATOMIC | WQ_HIGHPRI, 0);
>         BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
>                !system_unbound_wq || !system_freezable_wq ||
>                !system_power_efficient_wq ||
> -              !system_freezable_power_efficient_wq);
> +              !system_freezable_power_efficient_wq ||
> +              !system_atomic_wq || !system_atomic_highpri_wq);
>  }
>
>  static void __init wq_cpu_intensive_thresh_init(void)
> @@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
>          * up. Also, create a rescuer for workqueues that requested it.
>          */
>         for_each_possible_cpu(cpu) {
> -               for_each_cpu_worker_pool(pool, cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       pool->node = cpu_to_node(cpu);
> +               for_each_cpu_worker_pool(pool, cpu)
>                         pool->node = cpu_to_node(cpu);
> -               }
>         }
>
>         list_for_each_entry(wq, &workqueues, list) {
> @@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
>
>         /* create the initial workers */
>         for_each_online_cpu(cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       BUG_ON(!create_worker(pool));
>                 for_each_cpu_worker_pool(pool, cpu) {
>                         pool->flags &= ~POOL_DISASSOCIATED;
>                         BUG_ON(!create_worker(pool));
> diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
> index f6275944ada7..f65f204f38ea 100644
> --- a/kernel/workqueue_internal.h
> +++ b/kernel/workqueue_internal.h
> @@ -10,6 +10,7 @@
>
>  #include <linux/workqueue.h>
>  #include <linux/kthread.h>
> +#include <linux/interrupt.h>
>  #include <linux/preempt.h>
>
>  struct worker_pool;
> @@ -42,6 +43,8 @@ struct worker {
>         struct list_head        scheduled;      /* L: scheduled works */
>
>         struct task_struct      *task;          /* I: worker task */
> +       struct tasklet_struct   atomic_tasklet; /* I: tasklet for atomic pool */
> +
>         struct worker_pool      *pool;          /* A: the associated pool */
>                                                 /* L: for rescuers */
>         struct list_head        node;           /* A: anchored at pool->workers */
> --
> 2.43.0
>
>
Tejun Heo Jan. 29, 2024, 5:06 p.m. UTC | #14
Hello,

On Mon, Jan 29, 2024 at 09:00:38AM -0800, Allen wrote:
>     I rushed to reply to the draft patch you sent, I should have
> looked harder. My apologies.
> The idea that I have been working on is to completely move away from
> using tasklets.
> Essentially, "get rid of tasklets entirely in the kernel". So, the use
> of tasklet_schedule() & tasklet_hi_schedule()
> will have to go.

The idea is to take over the tasklet[_hi] softirqs once all users are
converted. Otherwise, we run into the problem of setting priorities between
tasklets and the atomic workqueue, which may be a theoretical problem.

>    I have a very hacky draft that is still wip. I am going to borrow
> many bits from your patch which makes
> the work I have better.
> 
>   Perhaps we should start a separate thread, thoughts?

Sure, please go ahead.

Thanks.
Tejun Heo Jan. 30, 2024, 9:19 a.m. UTC | #15
On Sat, Jan 27, 2024 at 08:37:45AM -1000, Tejun Heo wrote:
> Should have known when it worked too well on the first try but I missed a
> part in init and this was just running them on per-cpu workqueues. Will post
> an actually working version later.

For posterity, the patchset is at

 http://lkml.kernel.org/r/20240130091300.2968534-1-tj@kernel.org
Li Lingfeng April 9, 2024, 2:15 p.m. UTC | #16
Hi

I tried to reproduce the problem by adding mdelay() in 
tasklet_action_common() after t->func() but failed.
Since cc->io_queue is created without WQ_UNBOUND, the work queued to it 
will be executed by the CPU which queued the work, which means 
bio_endio() will not be called until tasklet_action_common has finished.
Could you please clarify how to reproduce the problem mentioned here?

Thanks

在 2024/1/26 2:29, Mikulas Patocka 写道:
> Hi
>
> There's a problem with the tasklet API - there is no reliable way how to
> free a structure that contains tasklet_struct. The problem is that the
> function tasklet_action_common calls task_unlock(t) after it called the
> callback. If the callback does something that frees tasklet_struct,
> task_unlock(t) would write into free memory.
>
> dm-crypt does this - it schedules a tasklet with tasklet_schedule, it does
> encryption inside the tasklet handler (because it performs better than
> doing the encryption in a workqueue), then it submits a workqueue entry
> and calls bio_endio from the workqueue entry.
>
> However, if the workqueue preempts ksoftirqd, this race condition happens:
>
> ksoftirqd:
> * tasklet_action_common
> * t->func(t->data)	(that points to kcryptd_crypt_tasklet)
> * kcryptd_crypt_tasklet
> * kcryptd_crypt
> * kcryptd_crypt_read_convert
> * crypt_dec_pending
> * queue_work(cc->io_queue, &io->work);
> now we switch to the workqueue process:
> * kcryptd_io_bio_endio
> * bio_endio(io->base_bio)	(this calls clone_endio)
> * clone_endio
> * free_tio
> * bio_put(clone) - the bio is freed
> now we switch back to ksoftirqd:
> * tasklet_action_common calls task_unlock(t)
> * task_unlock(t) touches memory that was already freed when the bio was freed
>
> dm-verity has a similar problem.
>
> In order to fix this bug, I am proposing to add a new flag
> TASKLET_STATE_ONESHOT. The flag indicates that the tasklet will be
> submitted only once and it prevents tasklet_action_common from touching
> the tasklet after the callback completed.
>
> If you have another idea how to solve this bug, let me know.
>
> Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
> Fixes: 39d42fa96ba1 ("dm crypt: add flags to optionally bypass kcryptd workqueues")
> Fixes: 5721d4e5a9cd ("dm verity: Add optional "try_verify_in_tasklet" feature")
> Cc: stable@vger.kernel.org	# v5.9+
>
> ---
>   drivers/md/dm-crypt.c         |    1 +
>   drivers/md/dm-verity-target.c |    1 +
>   include/linux/interrupt.h     |    9 ++++++++-
>   kernel/softirq.c              |   22 +++++++++++++++-------
>   4 files changed, 25 insertions(+), 8 deletions(-)
>
> Index: linux-2.6/drivers/md/dm-crypt.c
> ===================================================================
> --- linux-2.6.orig/drivers/md/dm-crypt.c	2024-01-18 19:18:30.000000000 +0100
> +++ linux-2.6/drivers/md/dm-crypt.c	2024-01-25 16:42:17.000000000 +0100
> @@ -2265,6 +2265,7 @@ static void kcryptd_queue_crypt(struct d
>   		if (in_hardirq() || irqs_disabled()) {
>   			io->in_tasklet = true;
>   			tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
> +			tasklet_set_oneshot(&io->tasklet);
>   			tasklet_schedule(&io->tasklet);
>   			return;
>   		}
> Index: linux-2.6/include/linux/interrupt.h
> ===================================================================
> --- linux-2.6.orig/include/linux/interrupt.h	2023-11-13 17:39:50.000000000 +0100
> +++ linux-2.6/include/linux/interrupt.h	2024-01-25 16:41:52.000000000 +0100
> @@ -684,7 +684,9 @@ struct tasklet_struct name = {				\
>   enum
>   {
>   	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
> -	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
> +	TASKLET_STATE_RUN,	/* Tasklet is running (SMP only) */
> +	TASKLET_STATE_ONESHOT	/* Don't unlock the tasklet after the callback
> +				   to avoid writing to free memory */
>   };
>   
>   #if defined(CONFIG_SMP) || defined(CONFIG_PREEMPT_RT)
> @@ -756,6 +758,11 @@ extern void tasklet_init(struct tasklet_
>   extern void tasklet_setup(struct tasklet_struct *t,
>   			  void (*callback)(struct tasklet_struct *));
>   
> +static inline void tasklet_set_oneshot(struct tasklet_struct *t)
> +{
> +	__set_bit(TASKLET_STATE_ONESHOT, &t->state);
> +}
> +
>   /*
>    * Autoprobing for irqs:
>    *
> Index: linux-2.6/kernel/softirq.c
> ===================================================================
> --- linux-2.6.orig/kernel/softirq.c	2023-10-31 15:31:42.000000000 +0100
> +++ linux-2.6/kernel/softirq.c	2024-01-25 17:10:03.000000000 +0100
> @@ -774,18 +774,26 @@ static void tasklet_action_common(struct
>   
>   		if (tasklet_trylock(t)) {
>   			if (!atomic_read(&t->count)) {
> +				/*
> +				 * If oneshot is set, we must not touch the
> +				 * tasklet after the callback.
> +				 */
> +				bool oneshot = test_bit(TASKLET_STATE_ONESHOT, &t->state);
>   				if (tasklet_clear_sched(t)) {
>   					if (t->use_callback) {
> -						trace_tasklet_entry(t, t->callback);
> -						t->callback(t);
> -						trace_tasklet_exit(t, t->callback);
> +						void (*callback)(struct tasklet_struct *) = t->callback;
> +						trace_tasklet_entry(t, callback);
> +						callback(t);
> +						trace_tasklet_exit(t, callback);
>   					} else {
> -						trace_tasklet_entry(t, t->func);
> -						t->func(t->data);
> -						trace_tasklet_exit(t, t->func);
> +						void (*func)(unsigned long) = t->func;
> +						trace_tasklet_entry(t, func);
> +						func(t->data);
> +						trace_tasklet_exit(t, func);
>   					}
>   				}
> -				tasklet_unlock(t);
> +				if (!oneshot)
> +					tasklet_unlock(t);
>   				continue;
>   			}
>   			tasklet_unlock(t);
> Index: linux-2.6/drivers/md/dm-verity-target.c
> ===================================================================
> --- linux-2.6.orig/drivers/md/dm-verity-target.c	2024-01-18 19:18:30.000000000 +0100
> +++ linux-2.6/drivers/md/dm-verity-target.c	2024-01-25 18:12:09.000000000 +0100
> @@ -676,6 +676,7 @@ static void verity_end_io(struct bio *bi
>   
>   	if (static_branch_unlikely(&use_tasklet_enabled) && io->v->use_tasklet) {
>   		tasklet_init(&io->tasklet, verity_tasklet, (unsigned long)io);
> +		tasklet_set_oneshot(&io->tasklet);
>   		tasklet_schedule(&io->tasklet);
>   	} else {
>   		INIT_WORK(&io->work, verity_work);
>
diff mbox series

Patch

Index: linux-2.6/drivers/md/dm-crypt.c
===================================================================
--- linux-2.6.orig/drivers/md/dm-crypt.c	2024-01-18 19:18:30.000000000 +0100
+++ linux-2.6/drivers/md/dm-crypt.c	2024-01-25 16:42:17.000000000 +0100
@@ -2265,6 +2265,7 @@  static void kcryptd_queue_crypt(struct d
 		if (in_hardirq() || irqs_disabled()) {
 			io->in_tasklet = true;
 			tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
+			tasklet_set_oneshot(&io->tasklet);
 			tasklet_schedule(&io->tasklet);
 			return;
 		}
Index: linux-2.6/include/linux/interrupt.h
===================================================================
--- linux-2.6.orig/include/linux/interrupt.h	2023-11-13 17:39:50.000000000 +0100
+++ linux-2.6/include/linux/interrupt.h	2024-01-25 16:41:52.000000000 +0100
@@ -684,7 +684,9 @@  struct tasklet_struct name = {				\
 enum
 {
 	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
-	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
+	TASKLET_STATE_RUN,	/* Tasklet is running (SMP only) */
+	TASKLET_STATE_ONESHOT	/* Don't unlock the tasklet after the callback
+				   to avoid writing to free memory */
 };
 
 #if defined(CONFIG_SMP) || defined(CONFIG_PREEMPT_RT)
@@ -756,6 +758,11 @@  extern void tasklet_init(struct tasklet_
 extern void tasklet_setup(struct tasklet_struct *t,
 			  void (*callback)(struct tasklet_struct *));
 
+static inline void tasklet_set_oneshot(struct tasklet_struct *t)
+{
+	__set_bit(TASKLET_STATE_ONESHOT, &t->state);
+}
+
 /*
  * Autoprobing for irqs:
  *
Index: linux-2.6/kernel/softirq.c
===================================================================
--- linux-2.6.orig/kernel/softirq.c	2023-10-31 15:31:42.000000000 +0100
+++ linux-2.6/kernel/softirq.c	2024-01-25 17:10:03.000000000 +0100
@@ -774,18 +774,26 @@  static void tasklet_action_common(struct
 
 		if (tasklet_trylock(t)) {
 			if (!atomic_read(&t->count)) {
+				/*
+				 * If oneshot is set, we must not touch the
+				 * tasklet after the callback.
+				 */
+				bool oneshot = test_bit(TASKLET_STATE_ONESHOT, &t->state);
 				if (tasklet_clear_sched(t)) {
 					if (t->use_callback) {
-						trace_tasklet_entry(t, t->callback);
-						t->callback(t);
-						trace_tasklet_exit(t, t->callback);
+						void (*callback)(struct tasklet_struct *) = t->callback;
+						trace_tasklet_entry(t, callback);
+						callback(t);
+						trace_tasklet_exit(t, callback);
 					} else {
-						trace_tasklet_entry(t, t->func);
-						t->func(t->data);
-						trace_tasklet_exit(t, t->func);
+						void (*func)(unsigned long) = t->func;
+						trace_tasklet_entry(t, func);
+						func(t->data);
+						trace_tasklet_exit(t, func);
 					}
 				}
-				tasklet_unlock(t);
+				if (!oneshot)
+					tasklet_unlock(t);
 				continue;
 			}
 			tasklet_unlock(t);
Index: linux-2.6/drivers/md/dm-verity-target.c
===================================================================
--- linux-2.6.orig/drivers/md/dm-verity-target.c	2024-01-18 19:18:30.000000000 +0100
+++ linux-2.6/drivers/md/dm-verity-target.c	2024-01-25 18:12:09.000000000 +0100
@@ -676,6 +676,7 @@  static void verity_end_io(struct bio *bi
 
 	if (static_branch_unlikely(&use_tasklet_enabled) && io->v->use_tasklet) {
 		tasklet_init(&io->tasklet, verity_tasklet, (unsigned long)io);
+		tasklet_set_oneshot(&io->tasklet);
 		tasklet_schedule(&io->tasklet);
 	} else {
 		INIT_WORK(&io->work, verity_work);