Message ID | 76dc3ad69fa457fd1e358ad3de874474f9f64716.1724701542.git.maciej.szmigiero@oracle.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Multifd | expand |
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> > > This is necessary for multifd_send() to be able to be called > from multiple threads. > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> > --- > migration/multifd.c | 24 ++++++++++++++++++------ > 1 file changed, 18 insertions(+), 6 deletions(-) > > diff --git a/migration/multifd.c b/migration/multifd.c > index d5a8e5a9c9b5..b25789dde0b3 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data) > return false; > } > > - /* We wait here, until at least one channel is ready */ > - qemu_sem_wait(&multifd_send_state->channels_ready); > - > /* > * next_channel can remain from a previous migration that was > * using more channels, so ensure it doesn't overflow if the > * limit is lower now. > */ > - next_channel %= migrate_multifd_channels(); > - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { > + i = qatomic_load_acquire(&next_channel); > + if (unlikely(i >= migrate_multifd_channels())) { > + qatomic_cmpxchg(&next_channel, i, 0); > + } Do we still need this? It seems not, because the mod down below would already truncate to a value less than the number of channels. We don't need it to start at 0 always, the channels are equivalent. > + > + /* We wait here, until at least one channel is ready */ > + qemu_sem_wait(&multifd_send_state->channels_ready); > + > + while (true) { > + int i_next; > + > if (multifd_send_should_exit()) { > return false; > } > + > + i = qatomic_load_acquire(&next_channel); > + i_next = (i + 1) % migrate_multifd_channels(); > + if (qatomic_cmpxchg(&next_channel, i, i_next) != i) { > + continue; > + } Say channel 'i' is the only one that's idle. What's stopping the other thread(s) to race at this point and loop around to the same index? > + > p = &multifd_send_state->params[i]; > /* > * Lockless read to p->pending_job is safe, because only multifd > * sender thread can clear it. > */ > if (qatomic_read(&p->pending_job) == false) { With the cmpxchg your other patch adds here, then the race I mentioned above should be harmless. But we'd need to bring that code into this patch. > - next_channel = (i + 1) % migrate_multifd_channels(); > break; > } > }
On 30.08.2024 20:13, Fabiano Rosas wrote: > "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: > >> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> >> >> This is necessary for multifd_send() to be able to be called >> from multiple threads. >> >> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> >> --- >> migration/multifd.c | 24 ++++++++++++++++++------ >> 1 file changed, 18 insertions(+), 6 deletions(-) >> >> diff --git a/migration/multifd.c b/migration/multifd.c >> index d5a8e5a9c9b5..b25789dde0b3 100644 >> --- a/migration/multifd.c >> +++ b/migration/multifd.c >> @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data) >> return false; >> } >> >> - /* We wait here, until at least one channel is ready */ >> - qemu_sem_wait(&multifd_send_state->channels_ready); >> - >> /* >> * next_channel can remain from a previous migration that was >> * using more channels, so ensure it doesn't overflow if the >> * limit is lower now. >> */ >> - next_channel %= migrate_multifd_channels(); >> - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { >> + i = qatomic_load_acquire(&next_channel); >> + if (unlikely(i >= migrate_multifd_channels())) { >> + qatomic_cmpxchg(&next_channel, i, 0); >> + } > > Do we still need this? It seems not, because the mod down below would > already truncate to a value less than the number of channels. We don't > need it to start at 0 always, the channels are equivalent. The "modulo" operation below forces i_next to be in the proper range, not i. If the qatomic_cmpxchg() ends up succeeding then we use the (now out of bounds) i value to index multifd_send_state->params[]. >> + >> + /* We wait here, until at least one channel is ready */ >> + qemu_sem_wait(&multifd_send_state->channels_ready); >> + >> + while (true) { >> + int i_next; >> + >> if (multifd_send_should_exit()) { >> return false; >> } >> + >> + i = qatomic_load_acquire(&next_channel); >> + i_next = (i + 1) % migrate_multifd_channels(); >> + if (qatomic_cmpxchg(&next_channel, i, i_next) != i) { >> + continue; >> + } > > Say channel 'i' is the only one that's idle. What's stopping the other > thread(s) to race at this point and loop around to the same index? See the reply below. >> + >> p = &multifd_send_state->params[i]; >> /* >> * Lockless read to p->pending_job is safe, because only multifd >> * sender thread can clear it. >> */ >> if (qatomic_read(&p->pending_job) == false) { > > With the cmpxchg your other patch adds here, then the race I mentioned > above should be harmless. But we'd need to bring that code into this > patch. > You're right - the sender code with this patch alone isn't thread safe yet but this commit is only literally about "converting multifd_send()::next_channel to atomic". At the time of this patch there aren't any multifd_send() calls from multiple threads, and the commit that introduces such possible call site (multifd_queue_device_state()) also modifies multifd_send() to be fully thread safe by introducing p->pending_job_preparing. Thanks, Maciej
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: > On 30.08.2024 20:13, Fabiano Rosas wrote: >> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: >> >>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> >>> >>> This is necessary for multifd_send() to be able to be called >>> from multiple threads. >>> >>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> >>> --- >>> migration/multifd.c | 24 ++++++++++++++++++------ >>> 1 file changed, 18 insertions(+), 6 deletions(-) >>> >>> diff --git a/migration/multifd.c b/migration/multifd.c >>> index d5a8e5a9c9b5..b25789dde0b3 100644 >>> --- a/migration/multifd.c >>> +++ b/migration/multifd.c >>> @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data) >>> return false; >>> } >>> >>> - /* We wait here, until at least one channel is ready */ >>> - qemu_sem_wait(&multifd_send_state->channels_ready); >>> - >>> /* >>> * next_channel can remain from a previous migration that was >>> * using more channels, so ensure it doesn't overflow if the >>> * limit is lower now. >>> */ >>> - next_channel %= migrate_multifd_channels(); >>> - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { >>> + i = qatomic_load_acquire(&next_channel); >>> + if (unlikely(i >= migrate_multifd_channels())) { >>> + qatomic_cmpxchg(&next_channel, i, 0); >>> + } >> >> Do we still need this? It seems not, because the mod down below would >> already truncate to a value less than the number of channels. We don't >> need it to start at 0 always, the channels are equivalent. > > The "modulo" operation below forces i_next to be in the proper range, > not i. > > If the qatomic_cmpxchg() ends up succeeding then we use the (now out of > bounds) i value to index multifd_send_state->params[]. Indeed. > >>> + >>> + /* We wait here, until at least one channel is ready */ >>> + qemu_sem_wait(&multifd_send_state->channels_ready); >>> + >>> + while (true) { >>> + int i_next; >>> + >>> if (multifd_send_should_exit()) { >>> return false; >>> } >>> + >>> + i = qatomic_load_acquire(&next_channel); >>> + i_next = (i + 1) % migrate_multifd_channels(); >>> + if (qatomic_cmpxchg(&next_channel, i, i_next) != i) { >>> + continue; >>> + } >> >> Say channel 'i' is the only one that's idle. What's stopping the other >> thread(s) to race at this point and loop around to the same index? > > See the reply below. > >>> + >>> p = &multifd_send_state->params[i]; >>> /* >>> * Lockless read to p->pending_job is safe, because only multifd >>> * sender thread can clear it. >>> */ >>> if (qatomic_read(&p->pending_job) == false) { >> >> With the cmpxchg your other patch adds here, then the race I mentioned >> above should be harmless. But we'd need to bring that code into this >> patch. >> > > You're right - the sender code with this patch alone isn't thread safe > yet but this commit is only literally about "converting > multifd_send()::next_channel to atomic". > > At the time of this patch there aren't any multifd_send() calls from > multiple threads, and the commit that introduces such possible call > site (multifd_queue_device_state()) also modifies multifd_send() > to be fully thread safe by introducing p->pending_job_preparing. In general this would be a bad practice because this commit can end up being moved around due to backporting or bisecting. It would be better if it were complete from the start. It also affects backporting due to ambiguity on where the Fixes tag should point to if someone eventually finds a bug. I already asked you to extract the other code into a separate patch, so it's not that bad. If you prefer to keep both changes separate for clarity, please note on the commit message that the next patch is necessary for correctness. > > Thanks, > Maciej
On 3.09.2024 17:01, Fabiano Rosas wrote: > "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: > >> On 30.08.2024 20:13, Fabiano Rosas wrote: >>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes: >>> >>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> >>>> >>>> This is necessary for multifd_send() to be able to be called >>>> from multiple threads. >>>> >>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> >>>> --- >>>> migration/multifd.c | 24 ++++++++++++++++++------ >>>> 1 file changed, 18 insertions(+), 6 deletions(-) >>>> >>>> diff --git a/migration/multifd.c b/migration/multifd.c >>>> index d5a8e5a9c9b5..b25789dde0b3 100644 >>>> --- a/migration/multifd.c >>>> +++ b/migration/multifd.c (..) >>>> + >>>> + /* We wait here, until at least one channel is ready */ >>>> + qemu_sem_wait(&multifd_send_state->channels_ready); >>>> + >>>> + while (true) { >>>> + int i_next; >>>> + >>>> if (multifd_send_should_exit()) { >>>> return false; >>>> } >>>> + >>>> + i = qatomic_load_acquire(&next_channel); >>>> + i_next = (i + 1) % migrate_multifd_channels(); >>>> + if (qatomic_cmpxchg(&next_channel, i, i_next) != i) { >>>> + continue; >>>> + } >>> >>> Say channel 'i' is the only one that's idle. What's stopping the other >>> thread(s) to race at this point and loop around to the same index? >> >> See the reply below. >> >>>> + >>>> p = &multifd_send_state->params[i]; >>>> /* >>>> * Lockless read to p->pending_job is safe, because only multifd >>>> * sender thread can clear it. >>>> */ >>>> if (qatomic_read(&p->pending_job) == false) { >>> >>> With the cmpxchg your other patch adds here, then the race I mentioned >>> above should be harmless. But we'd need to bring that code into this >>> patch. >>> >> >> You're right - the sender code with this patch alone isn't thread safe >> yet but this commit is only literally about "converting >> multifd_send()::next_channel to atomic". >> >> At the time of this patch there aren't any multifd_send() calls from >> multiple threads, and the commit that introduces such possible call >> site (multifd_queue_device_state()) also modifies multifd_send() >> to be fully thread safe by introducing p->pending_job_preparing. > > In general this would be a bad practice because this commit can end up > being moved around due to backporting or bisecting. It would be better > if it were complete from the start. It also affects backporting due to > ambiguity on where the Fixes tag should point to if someone eventually > finds a bug. > > I already asked you to extract the other code into a separate patch, so > it's not that bad. If you prefer to keep both changes separate for > clarity, please note on the commit message that the next patch is > necessary for correctness. > If someone picks parts of a patch set or reorders commits then I guess in many cases things can break indeed. But it looks like I will be able to move code changes around to have multifd_send() already thread safe by the time of this commit so I will do that. Thanks, Maciej
On Tue, Aug 27, 2024 at 07:54:29PM +0200, Maciej S. Szmigiero wrote: > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> > > This is necessary for multifd_send() to be able to be called > from multiple threads. > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> Would it be much simpler to just use a mutex for enqueue? Something like: ===8<=== diff --git a/migration/multifd.c b/migration/multifd.c index 9b200f4ad9..979c9748b5 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -69,6 +69,8 @@ struct { QemuSemaphore channels_created; /* send channels ready */ QemuSemaphore channels_ready; + /* Mutex to serialize multifd enqueues */ + QemuMutex multifd_send_mutex; /* * Have we already run terminate threads. There is a race when it * happens that we got one error while we are exiting. @@ -305,6 +307,8 @@ bool multifd_send(MultiFDSendData **send_data) MultiFDSendParams *p = NULL; /* make happy gcc */ MultiFDSendData *tmp; + QEMU_LOCK_GUARD(&multifd_send_mutex); + if (multifd_send_should_exit()) { return false; } @@ -824,6 +828,7 @@ bool multifd_send_setup(void) multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); + qemu_mutex_init(&multifd_send_state->multifd_send_mutex); qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; ===8<=== Then all the details doesn't need change (meanwhile the perf should be similar)?
diff --git a/migration/multifd.c b/migration/multifd.c index d5a8e5a9c9b5..b25789dde0b3 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data) return false; } - /* We wait here, until at least one channel is ready */ - qemu_sem_wait(&multifd_send_state->channels_ready); - /* * next_channel can remain from a previous migration that was * using more channels, so ensure it doesn't overflow if the * limit is lower now. */ - next_channel %= migrate_multifd_channels(); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + i = qatomic_load_acquire(&next_channel); + if (unlikely(i >= migrate_multifd_channels())) { + qatomic_cmpxchg(&next_channel, i, 0); + } + + /* We wait here, until at least one channel is ready */ + qemu_sem_wait(&multifd_send_state->channels_ready); + + while (true) { + int i_next; + if (multifd_send_should_exit()) { return false; } + + i = qatomic_load_acquire(&next_channel); + i_next = (i + 1) % migrate_multifd_channels(); + if (qatomic_cmpxchg(&next_channel, i, i_next) != i) { + continue; + } + p = &multifd_send_state->params[i]; /* * Lockless read to p->pending_job is safe, because only multifd * sender thread can clear it. */ if (qatomic_read(&p->pending_job) == false) { - next_channel = (i + 1) % migrate_multifd_channels(); break; } }