diff mbox series

[net-next,06/15] net/smc: implement DMB-related operations of loopback-ism

Message ID 20240111120036.109903-7-guwen@linux.alibaba.com (mailing list archive)
State Deferred
Delegated to: Netdev Maintainers
Headers show
Series net/smc: implement loopback-ism used by SMC-D | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for net-next
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1081 this patch: 1081
netdev/cc_maintainers success CCed 0 of 0 maintainers
netdev/build_clang success Errors and warnings before: 1108 this patch: 1108
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1108 this patch: 1108
netdev/checkpatch success total: 0 errors, 0 warnings, 0 checks, 217 lines checked
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Wen Gu Jan. 11, 2024, noon UTC
This implements DMB (un)registration and data move operations of
loopback-ism device.

Signed-off-by: Wen Gu <guwen@linux.alibaba.com>
---
 net/smc/smc_cdc.c      |   6 ++
 net/smc/smc_cdc.h      |   1 +
 net/smc/smc_loopback.c | 133 ++++++++++++++++++++++++++++++++++++++++-
 net/smc/smc_loopback.h |  13 ++++
 4 files changed, 150 insertions(+), 3 deletions(-)

Comments

Wenjia Zhang Feb. 16, 2024, 2:13 p.m. UTC | #1
On 11.01.24 13:00, Wen Gu wrote:
> This implements DMB (un)registration and data move operations of
> loopback-ism device.
> 
> Signed-off-by: Wen Gu <guwen@linux.alibaba.com>
> ---
>   net/smc/smc_cdc.c      |   6 ++
>   net/smc/smc_cdc.h      |   1 +
>   net/smc/smc_loopback.c | 133 ++++++++++++++++++++++++++++++++++++++++-
>   net/smc/smc_loopback.h |  13 ++++
>   4 files changed, 150 insertions(+), 3 deletions(-)
> 
> diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
> index 3c06625ceb20..c820ef197610 100644
> --- a/net/smc/smc_cdc.c
> +++ b/net/smc/smc_cdc.c
> @@ -410,6 +410,12 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
>   static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
>   {
>   	struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
> +
> +	smcd_cdc_rx_handler(conn);
> +}
> +
> +void smcd_cdc_rx_handler(struct smc_connection *conn)
> +{
>   	struct smcd_cdc_msg *data_cdc;
>   	struct smcd_cdc_msg cdc;
>   	struct smc_sock *smc;
> diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
> index 696cc11f2303..11559d4ebf2b 100644
> --- a/net/smc/smc_cdc.h
> +++ b/net/smc/smc_cdc.h
> @@ -301,5 +301,6 @@ int smcr_cdc_msg_send_validation(struct smc_connection *conn,
>   				 struct smc_wr_buf *wr_buf);
>   int smc_cdc_init(void) __init;
>   void smcd_cdc_rx_init(struct smc_connection *conn);
> +void smcd_cdc_rx_handler(struct smc_connection *conn);
>   
>   #endif /* SMC_CDC_H */
> diff --git a/net/smc/smc_loopback.c b/net/smc/smc_loopback.c
> index 353d4a2d69a1..f72e7b24fc1a 100644
> --- a/net/smc/smc_loopback.c
> +++ b/net/smc/smc_loopback.c
> @@ -15,11 +15,13 @@
>   #include <linux/types.h>
>   #include <net/smc.h>
>   
> +#include "smc_cdc.h"
>   #include "smc_ism.h"
>   #include "smc_loopback.h"
>   
>   #if IS_ENABLED(CONFIG_SMC_LO)
>   #define SMC_LO_V2_CAPABLE	0x1 /* loopback-ism acts as ISMv2 */
> +#define SMC_DMA_ADDR_INVALID	(~(dma_addr_t)0)
>   
>   static const char smc_lo_dev_name[] = "loopback-ism";
>   static struct smc_lo_dev *lo_dev;
> @@ -50,6 +52,97 @@ static int smc_lo_query_rgid(struct smcd_dev *smcd, struct smcd_gid *rgid,
>   	return 0;
>   }
>   
> +static int smc_lo_register_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb,
> +			       void *client_priv)
> +{
> +	struct smc_lo_dmb_node *dmb_node, *tmp_node;
> +	struct smc_lo_dev *ldev = smcd->priv;
> +	int sba_idx, order, rc;
> +	struct page *pages;
> +
> +	/* check space for new dmb */
> +	for_each_clear_bit(sba_idx, ldev->sba_idx_mask, SMC_LO_MAX_DMBS) {
> +		if (!test_and_set_bit(sba_idx, ldev->sba_idx_mask))
> +			break;
> +	}
> +	if (sba_idx == SMC_LO_MAX_DMBS)
> +		return -ENOSPC;
> +
> +	dmb_node = kzalloc(sizeof(*dmb_node), GFP_KERNEL);
> +	if (!dmb_node) {
> +		rc = -ENOMEM;
> +		goto err_bit;
> +	}
> +
> +	dmb_node->sba_idx = sba_idx;
> +	order = get_order(dmb->dmb_len);
> +	pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
> +			    __GFP_NOMEMALLOC | __GFP_COMP |
> +			    __GFP_NORETRY | __GFP_ZERO,
> +			    order);
> +	if (!pages) {
> +		rc = -ENOMEM;
> +		goto err_node;
> +	}
> +	dmb_node->cpu_addr = (void *)page_address(pages);
> +	dmb_node->len = dmb->dmb_len;
> +	dmb_node->dma_addr = SMC_DMA_ADDR_INVALID;
> +
> +again:
> +	/* add new dmb into hash table */
> +	get_random_bytes(&dmb_node->token, sizeof(dmb_node->token));
> +	write_lock(&ldev->dmb_ht_lock);
> +	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_node->token) {
> +		if (tmp_node->token == dmb_node->token) {
> +			write_unlock(&ldev->dmb_ht_lock);
> +			goto again;
> +		}
> +	}
> +	hash_add(ldev->dmb_ht, &dmb_node->list, dmb_node->token);
> +	write_unlock(&ldev->dmb_ht_lock);
> +
The write_lock_irqsave()/write_unlock_irqrestore() and 
read_lock_irqsave()/read_unlock_irqrestore()should be used instead of 
write_lock()/write_unlock() and read_lock()/read_unlock() in order to 
keep the lock irq-safe.

> +	dmb->sba_idx = dmb_node->sba_idx;
> +	dmb->dmb_tok = dmb_node->token;
> +	dmb->cpu_addr = dmb_node->cpu_addr;
> +	dmb->dma_addr = dmb_node->dma_addr;
> +	dmb->dmb_len = dmb_node->len;
> +
> +	return 0;
> +
> +err_node:
> +	kfree(dmb_node);
> +err_bit:
> +	clear_bit(sba_idx, ldev->sba_idx_mask);
> +	return rc;
> +}
> +
> +static int smc_lo_unregister_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb)
> +{
> +	struct smc_lo_dmb_node *dmb_node = NULL, *tmp_node;
> +	struct smc_lo_dev *ldev = smcd->priv;
> +
> +	/* remove dmb from hash table */
> +	write_lock(&ldev->dmb_ht_lock);
> +	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb->dmb_tok) {
> +		if (tmp_node->token == dmb->dmb_tok) {
> +			dmb_node = tmp_node;
> +			break;
> +		}
> +	}
> +	if (!dmb_node) {
> +		write_unlock(&ldev->dmb_ht_lock);
> +		return -EINVAL;
> +	}
> +	hash_del(&dmb_node->list);
> +	write_unlock(&ldev->dmb_ht_lock);
> +
> +	clear_bit(dmb_node->sba_idx, ldev->sba_idx_mask);
> +	kfree(dmb_node->cpu_addr);
> +	kfree(dmb_node);
> +
> +	return 0;
> +}
> +
>   static int smc_lo_add_vlan_id(struct smcd_dev *smcd, u64 vlan_id)
>   {
>   	return -EOPNOTSUPP;
> @@ -76,6 +169,38 @@ static int smc_lo_signal_event(struct smcd_dev *dev, struct smcd_gid *rgid,
>   	return 0;
>   }
>   
> +static int smc_lo_move_data(struct smcd_dev *smcd, u64 dmb_tok,
> +			    unsigned int idx, bool sf, unsigned int offset,
> +			    void *data, unsigned int size)
> +{
> +	struct smc_lo_dmb_node *rmb_node = NULL, *tmp_node;
> +	struct smc_lo_dev *ldev = smcd->priv;
> +
> +	read_lock(&ldev->dmb_ht_lock);
> +	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_tok) {
> +		if (tmp_node->token == dmb_tok) {
> +			rmb_node = tmp_node;
> +			break;
> +		}
> +	}
> +	if (!rmb_node) {
> +		read_unlock(&ldev->dmb_ht_lock);
> +		return -EINVAL;
> +	}
> +	read_unlock(&ldev->dmb_ht_lock);
> +
> +	memcpy((char *)rmb_node->cpu_addr + offset, data, size);
> +

Should this read_unlock be placed behind memcpy()?

<...>
Wen Gu Feb. 20, 2024, 1:55 a.m. UTC | #2
On 2024/2/16 22:13, Wenjia Zhang wrote:
> 
> 
> On 11.01.24 13:00, Wen Gu wrote:
>> This implements DMB (un)registration and data move operations of
>> loopback-ism device.
>>
>> Signed-off-by: Wen Gu <guwen@linux.alibaba.com>
>> ---
>>   net/smc/smc_cdc.c      |   6 ++
>>   net/smc/smc_cdc.h      |   1 +
>>   net/smc/smc_loopback.c | 133 ++++++++++++++++++++++++++++++++++++++++-
>>   net/smc/smc_loopback.h |  13 ++++
>>   4 files changed, 150 insertions(+), 3 deletions(-)
>>
>> diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
>> index 3c06625ceb20..c820ef197610 100644
>> --- a/net/smc/smc_cdc.c
>> +++ b/net/smc/smc_cdc.c
>> @@ -410,6 +410,12 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
>>   static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
>>   {
>>       struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
>> +
>> +    smcd_cdc_rx_handler(conn);
>> +}
>> +
>> +void smcd_cdc_rx_handler(struct smc_connection *conn)
>> +{
>>       struct smcd_cdc_msg *data_cdc;
>>       struct smcd_cdc_msg cdc;
>>       struct smc_sock *smc;
>> diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
>> index 696cc11f2303..11559d4ebf2b 100644
>> --- a/net/smc/smc_cdc.h
>> +++ b/net/smc/smc_cdc.h
>> @@ -301,5 +301,6 @@ int smcr_cdc_msg_send_validation(struct smc_connection *conn,
>>                    struct smc_wr_buf *wr_buf);
>>   int smc_cdc_init(void) __init;
>>   void smcd_cdc_rx_init(struct smc_connection *conn);
>> +void smcd_cdc_rx_handler(struct smc_connection *conn);
>>   #endif /* SMC_CDC_H */
>> diff --git a/net/smc/smc_loopback.c b/net/smc/smc_loopback.c
>> index 353d4a2d69a1..f72e7b24fc1a 100644
>> --- a/net/smc/smc_loopback.c
>> +++ b/net/smc/smc_loopback.c
>> @@ -15,11 +15,13 @@
>>   #include <linux/types.h>
>>   #include <net/smc.h>
>> +#include "smc_cdc.h"
>>   #include "smc_ism.h"
>>   #include "smc_loopback.h"
>>   #if IS_ENABLED(CONFIG_SMC_LO)
>>   #define SMC_LO_V2_CAPABLE    0x1 /* loopback-ism acts as ISMv2 */
>> +#define SMC_DMA_ADDR_INVALID    (~(dma_addr_t)0)
>>   static const char smc_lo_dev_name[] = "loopback-ism";
>>   static struct smc_lo_dev *lo_dev;
>> @@ -50,6 +52,97 @@ static int smc_lo_query_rgid(struct smcd_dev *smcd, struct smcd_gid *rgid,
>>       return 0;
>>   }
>> +static int smc_lo_register_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb,
>> +                   void *client_priv)
>> +{
>> +    struct smc_lo_dmb_node *dmb_node, *tmp_node;
>> +    struct smc_lo_dev *ldev = smcd->priv;
>> +    int sba_idx, order, rc;
>> +    struct page *pages;
>> +
>> +    /* check space for new dmb */
>> +    for_each_clear_bit(sba_idx, ldev->sba_idx_mask, SMC_LO_MAX_DMBS) {
>> +        if (!test_and_set_bit(sba_idx, ldev->sba_idx_mask))
>> +            break;
>> +    }
>> +    if (sba_idx == SMC_LO_MAX_DMBS)
>> +        return -ENOSPC;
>> +
>> +    dmb_node = kzalloc(sizeof(*dmb_node), GFP_KERNEL);
>> +    if (!dmb_node) {
>> +        rc = -ENOMEM;
>> +        goto err_bit;
>> +    }
>> +
>> +    dmb_node->sba_idx = sba_idx;
>> +    order = get_order(dmb->dmb_len);
>> +    pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
>> +                __GFP_NOMEMALLOC | __GFP_COMP |
>> +                __GFP_NORETRY | __GFP_ZERO,
>> +                order);
>> +    if (!pages) {
>> +        rc = -ENOMEM;
>> +        goto err_node;
>> +    }
>> +    dmb_node->cpu_addr = (void *)page_address(pages);
>> +    dmb_node->len = dmb->dmb_len;
>> +    dmb_node->dma_addr = SMC_DMA_ADDR_INVALID;
>> +
>> +again:
>> +    /* add new dmb into hash table */
>> +    get_random_bytes(&dmb_node->token, sizeof(dmb_node->token));
>> +    write_lock(&ldev->dmb_ht_lock);
>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_node->token) {
>> +        if (tmp_node->token == dmb_node->token) {
>> +            write_unlock(&ldev->dmb_ht_lock);
>> +            goto again;
>> +        }
>> +    }
>> +    hash_add(ldev->dmb_ht, &dmb_node->list, dmb_node->token);
>> +    write_unlock(&ldev->dmb_ht_lock);
>> +
> The write_lock_irqsave()/write_unlock_irqrestore() and read_lock_irqsave()/read_unlock_irqrestore()should be used 
> instead of write_lock()/write_unlock() and read_lock()/read_unlock() in order to keep the lock irq-safe.
> 

dmb_ht_lock won't be hold in an interrupt or sockirq context. The dmb_{register|unregister},
dmb_{attach|detach} and data_move are all on the process context. So I think write_(un)lock
and read_(un)lock is safe here.

>> +    dmb->sba_idx = dmb_node->sba_idx;
>> +    dmb->dmb_tok = dmb_node->token;
>> +    dmb->cpu_addr = dmb_node->cpu_addr;
>> +    dmb->dma_addr = dmb_node->dma_addr;
>> +    dmb->dmb_len = dmb_node->len;
>> +
>> +    return 0;
>> +
>> +err_node:
>> +    kfree(dmb_node);
>> +err_bit:
>> +    clear_bit(sba_idx, ldev->sba_idx_mask);
>> +    return rc;
>> +}
>> +
>> +static int smc_lo_unregister_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb)
>> +{
>> +    struct smc_lo_dmb_node *dmb_node = NULL, *tmp_node;
>> +    struct smc_lo_dev *ldev = smcd->priv;
>> +
>> +    /* remove dmb from hash table */
>> +    write_lock(&ldev->dmb_ht_lock);
>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb->dmb_tok) {
>> +        if (tmp_node->token == dmb->dmb_tok) {
>> +            dmb_node = tmp_node;
>> +            break;
>> +        }
>> +    }
>> +    if (!dmb_node) {
>> +        write_unlock(&ldev->dmb_ht_lock);
>> +        return -EINVAL;
>> +    }
>> +    hash_del(&dmb_node->list);
>> +    write_unlock(&ldev->dmb_ht_lock);
>> +
>> +    clear_bit(dmb_node->sba_idx, ldev->sba_idx_mask);
>> +    kfree(dmb_node->cpu_addr);
>> +    kfree(dmb_node);
>> +
>> +    return 0;
>> +}
>> +
>>   static int smc_lo_add_vlan_id(struct smcd_dev *smcd, u64 vlan_id)
>>   {
>>       return -EOPNOTSUPP;
>> @@ -76,6 +169,38 @@ static int smc_lo_signal_event(struct smcd_dev *dev, struct smcd_gid *rgid,
>>       return 0;
>>   }
>> +static int smc_lo_move_data(struct smcd_dev *smcd, u64 dmb_tok,
>> +                unsigned int idx, bool sf, unsigned int offset,
>> +                void *data, unsigned int size)
>> +{
>> +    struct smc_lo_dmb_node *rmb_node = NULL, *tmp_node;
>> +    struct smc_lo_dev *ldev = smcd->priv;
>> +
>> +    read_lock(&ldev->dmb_ht_lock);
>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_tok) {
>> +        if (tmp_node->token == dmb_tok) {
>> +            rmb_node = tmp_node;
>> +            break;
>> +        }
>> +    }
>> +    if (!rmb_node) {
>> +        read_unlock(&ldev->dmb_ht_lock);
>> +        return -EINVAL;
>> +    }
>> +    read_unlock(&ldev->dmb_ht_lock);
>> +
>> +    memcpy((char *)rmb_node->cpu_addr + offset, data, size);
>> +
> 
> Should this read_unlock be placed behind memcpy()?
> 

dmb_ht_lock is used to ensure safe access to the DMB hash table of loopback-ism.
The DMB hash table could be accessed by all the connections on loopback-ism, so
it should be protected.

But a certain DMB is only used by one connection, and the move_data process is
protected by conn->send_lock (see smcd_tx_sndbuf_nonempty()), so the memcpy(rmb_node)
here is safe and no race with other.

Thanks!

> <...>
Wenjia Zhang Feb. 23, 2024, 2:12 p.m. UTC | #3
On 20.02.24 02:55, Wen Gu wrote:
> 
> 
> On 2024/2/16 22:13, Wenjia Zhang wrote:
>>
>>
>> On 11.01.24 13:00, Wen Gu wrote:
>>> This implements DMB (un)registration and data move operations of
>>> loopback-ism device.
>>>
>>> Signed-off-by: Wen Gu <guwen@linux.alibaba.com>
>>> ---
>>>   net/smc/smc_cdc.c      |   6 ++
>>>   net/smc/smc_cdc.h      |   1 +
>>>   net/smc/smc_loopback.c | 133 ++++++++++++++++++++++++++++++++++++++++-
>>>   net/smc/smc_loopback.h |  13 ++++
>>>   4 files changed, 150 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
>>> index 3c06625ceb20..c820ef197610 100644
>>> --- a/net/smc/smc_cdc.c
>>> +++ b/net/smc/smc_cdc.c
>>> @@ -410,6 +410,12 @@ static void smc_cdc_msg_recv(struct smc_sock 
>>> *smc, struct smc_cdc_msg *cdc)
>>>   static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
>>>   {
>>>       struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
>>> +
>>> +    smcd_cdc_rx_handler(conn);
>>> +}
>>> +
>>> +void smcd_cdc_rx_handler(struct smc_connection *conn)
>>> +{
>>>       struct smcd_cdc_msg *data_cdc;
>>>       struct smcd_cdc_msg cdc;
>>>       struct smc_sock *smc;
>>> diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
>>> index 696cc11f2303..11559d4ebf2b 100644
>>> --- a/net/smc/smc_cdc.h
>>> +++ b/net/smc/smc_cdc.h
>>> @@ -301,5 +301,6 @@ int smcr_cdc_msg_send_validation(struct 
>>> smc_connection *conn,
>>>                    struct smc_wr_buf *wr_buf);
>>>   int smc_cdc_init(void) __init;
>>>   void smcd_cdc_rx_init(struct smc_connection *conn);
>>> +void smcd_cdc_rx_handler(struct smc_connection *conn);
>>>   #endif /* SMC_CDC_H */
>>> diff --git a/net/smc/smc_loopback.c b/net/smc/smc_loopback.c
>>> index 353d4a2d69a1..f72e7b24fc1a 100644
>>> --- a/net/smc/smc_loopback.c
>>> +++ b/net/smc/smc_loopback.c
>>> @@ -15,11 +15,13 @@
>>>   #include <linux/types.h>
>>>   #include <net/smc.h>
>>> +#include "smc_cdc.h"
>>>   #include "smc_ism.h"
>>>   #include "smc_loopback.h"
>>>   #if IS_ENABLED(CONFIG_SMC_LO)
>>>   #define SMC_LO_V2_CAPABLE    0x1 /* loopback-ism acts as ISMv2 */
>>> +#define SMC_DMA_ADDR_INVALID    (~(dma_addr_t)0)
>>>   static const char smc_lo_dev_name[] = "loopback-ism";
>>>   static struct smc_lo_dev *lo_dev;
>>> @@ -50,6 +52,97 @@ static int smc_lo_query_rgid(struct smcd_dev 
>>> *smcd, struct smcd_gid *rgid,
>>>       return 0;
>>>   }
>>> +static int smc_lo_register_dmb(struct smcd_dev *smcd, struct 
>>> smcd_dmb *dmb,
>>> +                   void *client_priv)
>>> +{
>>> +    struct smc_lo_dmb_node *dmb_node, *tmp_node;
>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>> +    int sba_idx, order, rc;
>>> +    struct page *pages;
>>> +
>>> +    /* check space for new dmb */
>>> +    for_each_clear_bit(sba_idx, ldev->sba_idx_mask, SMC_LO_MAX_DMBS) {
>>> +        if (!test_and_set_bit(sba_idx, ldev->sba_idx_mask))
>>> +            break;
>>> +    }
>>> +    if (sba_idx == SMC_LO_MAX_DMBS)
>>> +        return -ENOSPC;
>>> +
>>> +    dmb_node = kzalloc(sizeof(*dmb_node), GFP_KERNEL);
>>> +    if (!dmb_node) {
>>> +        rc = -ENOMEM;
>>> +        goto err_bit;
>>> +    }
>>> +
>>> +    dmb_node->sba_idx = sba_idx;
>>> +    order = get_order(dmb->dmb_len);
>>> +    pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
>>> +                __GFP_NOMEMALLOC | __GFP_COMP |
>>> +                __GFP_NORETRY | __GFP_ZERO,
>>> +                order);
>>> +    if (!pages) {
>>> +        rc = -ENOMEM;
>>> +        goto err_node;
>>> +    }
>>> +    dmb_node->cpu_addr = (void *)page_address(pages);
>>> +    dmb_node->len = dmb->dmb_len;
>>> +    dmb_node->dma_addr = SMC_DMA_ADDR_INVALID;
>>> +
>>> +again:
>>> +    /* add new dmb into hash table */
>>> +    get_random_bytes(&dmb_node->token, sizeof(dmb_node->token));
>>> +    write_lock(&ldev->dmb_ht_lock);
>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, 
>>> dmb_node->token) {
>>> +        if (tmp_node->token == dmb_node->token) {
>>> +            write_unlock(&ldev->dmb_ht_lock);
>>> +            goto again;
>>> +        }
>>> +    }
>>> +    hash_add(ldev->dmb_ht, &dmb_node->list, dmb_node->token);
>>> +    write_unlock(&ldev->dmb_ht_lock);
>>> +
>> The write_lock_irqsave()/write_unlock_irqrestore() and 
>> read_lock_irqsave()/read_unlock_irqrestore()should be used instead of 
>> write_lock()/write_unlock() and read_lock()/read_unlock() in order to 
>> keep the lock irq-safe.
>>
> 
> dmb_ht_lock won't be hold in an interrupt or sockirq context. The 
> dmb_{register|unregister},
> dmb_{attach|detach} and data_move are all on the process context. So I 
> think write_(un)lock
> and read_(un)lock is safe here.

right, it is not directly hold in a interrupt context, but it has a 
dependency on conn->send_lock as you wrote below, which requires 
irq-safe lock. And this matches our finding from a test:

=====================================================
WARNING: SOFTIRQ-safe -> SOFTIRQ-unsafe lock order detected
6.8.0-rc4-00787-g8eb4d2392609 #2 Not tainted
-----------------------------------------------------
smcapp/33802 [HC0[0]:SC0[2]:HE1:SE0] is trying to acquire:
00000000a2fc0330 (&ldev->dmb_ht_lock){++++}-{2:2}, at: 
smc_lo_move_data+0x84/0x1d0 [>
and this task is already holding:
00000000e4df6f28 (&smc->conn.send_lock){+.-.}-{2:2}, at: 
smc_tx_sndbuf_nonempty+0xaa>
which would create a new lock dependency:
(&smc->conn.send_lock){+.-.}-{2:2} -> (&ldev->dmb_ht_lock){++++}-{2:2}
but this new dependency connects a SOFTIRQ-irq-safe lock:
(&smc->conn.send_lock){+.-.}-{2:2}

> 
>>> +    dmb->sba_idx = dmb_node->sba_idx;
>>> +    dmb->dmb_tok = dmb_node->token;
>>> +    dmb->cpu_addr = dmb_node->cpu_addr;
>>> +    dmb->dma_addr = dmb_node->dma_addr;
>>> +    dmb->dmb_len = dmb_node->len;
>>> +
>>> +    return 0;
>>> +
>>> +err_node:
>>> +    kfree(dmb_node);
>>> +err_bit:
>>> +    clear_bit(sba_idx, ldev->sba_idx_mask);
>>> +    return rc;
>>> +}
>>> +
>>> +static int smc_lo_unregister_dmb(struct smcd_dev *smcd, struct 
>>> smcd_dmb *dmb)
>>> +{
>>> +    struct smc_lo_dmb_node *dmb_node = NULL, *tmp_node;
>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>> +
>>> +    /* remove dmb from hash table */
>>> +    write_lock(&ldev->dmb_ht_lock);
>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, 
>>> dmb->dmb_tok) {
>>> +        if (tmp_node->token == dmb->dmb_tok) {
>>> +            dmb_node = tmp_node;
>>> +            break;
>>> +        }
>>> +    }
>>> +    if (!dmb_node) {
>>> +        write_unlock(&ldev->dmb_ht_lock);
>>> +        return -EINVAL;
>>> +    }
>>> +    hash_del(&dmb_node->list);
>>> +    write_unlock(&ldev->dmb_ht_lock);
>>> +
>>> +    clear_bit(dmb_node->sba_idx, ldev->sba_idx_mask);
>>> +    kfree(dmb_node->cpu_addr);
>>> +    kfree(dmb_node);
>>> +
>>> +    return 0;
>>> +}
>>> +
>>>   static int smc_lo_add_vlan_id(struct smcd_dev *smcd, u64 vlan_id)
>>>   {
>>>       return -EOPNOTSUPP;
>>> @@ -76,6 +169,38 @@ static int smc_lo_signal_event(struct smcd_dev 
>>> *dev, struct smcd_gid *rgid,
>>>       return 0;
>>>   }
>>> +static int smc_lo_move_data(struct smcd_dev *smcd, u64 dmb_tok,
>>> +                unsigned int idx, bool sf, unsigned int offset,
>>> +                void *data, unsigned int size)
>>> +{
>>> +    struct smc_lo_dmb_node *rmb_node = NULL, *tmp_node;
>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>> +
>>> +    read_lock(&ldev->dmb_ht_lock);
>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_tok) {
>>> +        if (tmp_node->token == dmb_tok) {
>>> +            rmb_node = tmp_node;
>>> +            break;
>>> +        }
>>> +    }
>>> +    if (!rmb_node) {
>>> +        read_unlock(&ldev->dmb_ht_lock);
>>> +        return -EINVAL;
>>> +    }
>>> +    read_unlock(&ldev->dmb_ht_lock);
>>> +
>>> +    memcpy((char *)rmb_node->cpu_addr + offset, data, size);
>>> +
>>
>> Should this read_unlock be placed behind memcpy()?
>>
> 
> dmb_ht_lock is used to ensure safe access to the DMB hash table of 
> loopback-ism.
> The DMB hash table could be accessed by all the connections on 
> loopback-ism, so
> it should be protected.
> 
> But a certain DMB is only used by one connection, and the move_data 
> process is
> protected by conn->send_lock (see smcd_tx_sndbuf_nonempty()), so the 
> memcpy(rmb_node)
> here is safe and no race with other.
> 
> Thanks!
> 
sounds reasonable.
>> <...>
Wen Gu Feb. 26, 2024, 3:04 a.m. UTC | #4
On 2024/2/23 22:12, Wenjia Zhang wrote:
> 
> 
> On 20.02.24 02:55, Wen Gu wrote:
>>
>>
>> On 2024/2/16 22:13, Wenjia Zhang wrote:
>>>
>>>
>>> On 11.01.24 13:00, Wen Gu wrote:
>>>> This implements DMB (un)registration and data move operations of
>>>> loopback-ism device.
>>>>
>>>> Signed-off-by: Wen Gu <guwen@linux.alibaba.com>
>>>> ---
>>>>   net/smc/smc_cdc.c      |   6 ++
>>>>   net/smc/smc_cdc.h      |   1 +
>>>>   net/smc/smc_loopback.c | 133 ++++++++++++++++++++++++++++++++++++++++-
>>>>   net/smc/smc_loopback.h |  13 ++++
>>>>   4 files changed, 150 insertions(+), 3 deletions(-)
>>>>
>>>> diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
>>>> index 3c06625ceb20..c820ef197610 100644
>>>> --- a/net/smc/smc_cdc.c
>>>> +++ b/net/smc/smc_cdc.c
>>>> @@ -410,6 +410,12 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
>>>>   static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
>>>>   {
>>>>       struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
>>>> +
>>>> +    smcd_cdc_rx_handler(conn);
>>>> +}
>>>> +
>>>> +void smcd_cdc_rx_handler(struct smc_connection *conn)
>>>> +{
>>>>       struct smcd_cdc_msg *data_cdc;
>>>>       struct smcd_cdc_msg cdc;
>>>>       struct smc_sock *smc;
>>>> diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
>>>> index 696cc11f2303..11559d4ebf2b 100644
>>>> --- a/net/smc/smc_cdc.h
>>>> +++ b/net/smc/smc_cdc.h
>>>> @@ -301,5 +301,6 @@ int smcr_cdc_msg_send_validation(struct smc_connection *conn,
>>>>                    struct smc_wr_buf *wr_buf);
>>>>   int smc_cdc_init(void) __init;
>>>>   void smcd_cdc_rx_init(struct smc_connection *conn);
>>>> +void smcd_cdc_rx_handler(struct smc_connection *conn);
>>>>   #endif /* SMC_CDC_H */
>>>> diff --git a/net/smc/smc_loopback.c b/net/smc/smc_loopback.c
>>>> index 353d4a2d69a1..f72e7b24fc1a 100644
>>>> --- a/net/smc/smc_loopback.c
>>>> +++ b/net/smc/smc_loopback.c
>>>> @@ -15,11 +15,13 @@
>>>>   #include <linux/types.h>
>>>>   #include <net/smc.h>
>>>> +#include "smc_cdc.h"
>>>>   #include "smc_ism.h"
>>>>   #include "smc_loopback.h"
>>>>   #if IS_ENABLED(CONFIG_SMC_LO)
>>>>   #define SMC_LO_V2_CAPABLE    0x1 /* loopback-ism acts as ISMv2 */
>>>> +#define SMC_DMA_ADDR_INVALID    (~(dma_addr_t)0)
>>>>   static const char smc_lo_dev_name[] = "loopback-ism";
>>>>   static struct smc_lo_dev *lo_dev;
>>>> @@ -50,6 +52,97 @@ static int smc_lo_query_rgid(struct smcd_dev *smcd, struct smcd_gid *rgid,
>>>>       return 0;
>>>>   }
>>>> +static int smc_lo_register_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb,
>>>> +                   void *client_priv)
>>>> +{
>>>> +    struct smc_lo_dmb_node *dmb_node, *tmp_node;
>>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>>> +    int sba_idx, order, rc;
>>>> +    struct page *pages;
>>>> +
>>>> +    /* check space for new dmb */
>>>> +    for_each_clear_bit(sba_idx, ldev->sba_idx_mask, SMC_LO_MAX_DMBS) {
>>>> +        if (!test_and_set_bit(sba_idx, ldev->sba_idx_mask))
>>>> +            break;
>>>> +    }
>>>> +    if (sba_idx == SMC_LO_MAX_DMBS)
>>>> +        return -ENOSPC;
>>>> +
>>>> +    dmb_node = kzalloc(sizeof(*dmb_node), GFP_KERNEL);
>>>> +    if (!dmb_node) {
>>>> +        rc = -ENOMEM;
>>>> +        goto err_bit;
>>>> +    }
>>>> +
>>>> +    dmb_node->sba_idx = sba_idx;
>>>> +    order = get_order(dmb->dmb_len);
>>>> +    pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
>>>> +                __GFP_NOMEMALLOC | __GFP_COMP |
>>>> +                __GFP_NORETRY | __GFP_ZERO,
>>>> +                order);
>>>> +    if (!pages) {
>>>> +        rc = -ENOMEM;
>>>> +        goto err_node;
>>>> +    }
>>>> +    dmb_node->cpu_addr = (void *)page_address(pages);
>>>> +    dmb_node->len = dmb->dmb_len;
>>>> +    dmb_node->dma_addr = SMC_DMA_ADDR_INVALID;
>>>> +
>>>> +again:
>>>> +    /* add new dmb into hash table */
>>>> +    get_random_bytes(&dmb_node->token, sizeof(dmb_node->token));
>>>> +    write_lock(&ldev->dmb_ht_lock);
>>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_node->token) {
>>>> +        if (tmp_node->token == dmb_node->token) {
>>>> +            write_unlock(&ldev->dmb_ht_lock);
>>>> +            goto again;
>>>> +        }
>>>> +    }
>>>> +    hash_add(ldev->dmb_ht, &dmb_node->list, dmb_node->token);
>>>> +    write_unlock(&ldev->dmb_ht_lock);
>>>> +
>>> The write_lock_irqsave()/write_unlock_irqrestore() and read_lock_irqsave()/read_unlock_irqrestore()should be used 
>>> instead of write_lock()/write_unlock() and read_lock()/read_unlock() in order to keep the lock irq-safe.
>>>
>>
>> dmb_ht_lock won't be hold in an interrupt or sockirq context. The dmb_{register|unregister},
>> dmb_{attach|detach} and data_move are all on the process context. So I think write_(un)lock
>> and read_(un)lock is safe here.
> 
> right, it is not directly hold in a interrupt context, but it has a dependency on conn->send_lock as you wrote below, 
> which requires irq-safe lock. And this matches our finding from a test:
> 
> =====================================================
> WARNING: SOFTIRQ-safe -> SOFTIRQ-unsafe lock order detected
> 6.8.0-rc4-00787-g8eb4d2392609 #2 Not tainted
> -----------------------------------------------------
> smcapp/33802 [HC0[0]:SC0[2]:HE1:SE0] is trying to acquire:
> 00000000a2fc0330 (&ldev->dmb_ht_lock){++++}-{2:2}, at: smc_lo_move_data+0x84/0x1d0 [>
> and this task is already holding:
> 00000000e4df6f28 (&smc->conn.send_lock){+.-.}-{2:2}, at: smc_tx_sndbuf_nonempty+0xaa>
> which would create a new lock dependency:
> (&smc->conn.send_lock){+.-.}-{2:2} -> (&ldev->dmb_ht_lock){++++}-{2:2}
> but this new dependency connects a SOFTIRQ-irq-safe lock:
> (&smc->conn.send_lock){+.-.}-{2:2}
> 

I understand, thank you Wenjia. I will fix it in the next version.

>>
>>>> +    dmb->sba_idx = dmb_node->sba_idx;
>>>> +    dmb->dmb_tok = dmb_node->token;
>>>> +    dmb->cpu_addr = dmb_node->cpu_addr;
>>>> +    dmb->dma_addr = dmb_node->dma_addr;
>>>> +    dmb->dmb_len = dmb_node->len;
>>>> +
>>>> +    return 0;
>>>> +
>>>> +err_node:
>>>> +    kfree(dmb_node);
>>>> +err_bit:
>>>> +    clear_bit(sba_idx, ldev->sba_idx_mask);
>>>> +    return rc;
>>>> +}
>>>> +
>>>> +static int smc_lo_unregister_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb)
>>>> +{
>>>> +    struct smc_lo_dmb_node *dmb_node = NULL, *tmp_node;
>>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>>> +
>>>> +    /* remove dmb from hash table */
>>>> +    write_lock(&ldev->dmb_ht_lock);
>>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb->dmb_tok) {
>>>> +        if (tmp_node->token == dmb->dmb_tok) {
>>>> +            dmb_node = tmp_node;
>>>> +            break;
>>>> +        }
>>>> +    }
>>>> +    if (!dmb_node) {
>>>> +        write_unlock(&ldev->dmb_ht_lock);
>>>> +        return -EINVAL;
>>>> +    }
>>>> +    hash_del(&dmb_node->list);
>>>> +    write_unlock(&ldev->dmb_ht_lock);
>>>> +
>>>> +    clear_bit(dmb_node->sba_idx, ldev->sba_idx_mask);
>>>> +    kfree(dmb_node->cpu_addr);
>>>> +    kfree(dmb_node);
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>>   static int smc_lo_add_vlan_id(struct smcd_dev *smcd, u64 vlan_id)
>>>>   {
>>>>       return -EOPNOTSUPP;
>>>> @@ -76,6 +169,38 @@ static int smc_lo_signal_event(struct smcd_dev *dev, struct smcd_gid *rgid,
>>>>       return 0;
>>>>   }
>>>> +static int smc_lo_move_data(struct smcd_dev *smcd, u64 dmb_tok,
>>>> +                unsigned int idx, bool sf, unsigned int offset,
>>>> +                void *data, unsigned int size)
>>>> +{
>>>> +    struct smc_lo_dmb_node *rmb_node = NULL, *tmp_node;
>>>> +    struct smc_lo_dev *ldev = smcd->priv;
>>>> +
>>>> +    read_lock(&ldev->dmb_ht_lock);
>>>> +    hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_tok) {
>>>> +        if (tmp_node->token == dmb_tok) {
>>>> +            rmb_node = tmp_node;
>>>> +            break;
>>>> +        }
>>>> +    }
>>>> +    if (!rmb_node) {
>>>> +        read_unlock(&ldev->dmb_ht_lock);
>>>> +        return -EINVAL;
>>>> +    }
>>>> +    read_unlock(&ldev->dmb_ht_lock);
>>>> +
>>>> +    memcpy((char *)rmb_node->cpu_addr + offset, data, size);
>>>> +
>>>
>>> Should this read_unlock be placed behind memcpy()?
>>>
>>
>> dmb_ht_lock is used to ensure safe access to the DMB hash table of loopback-ism.
>> The DMB hash table could be accessed by all the connections on loopback-ism, so
>> it should be protected.
>>
>> But a certain DMB is only used by one connection, and the move_data process is
>> protected by conn->send_lock (see smcd_tx_sndbuf_nonempty()), so the memcpy(rmb_node)
>> here is safe and no race with other.
>>
>> Thanks!
>>
> sounds reasonable.
>>> <...>
diff mbox series

Patch

diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
index 3c06625ceb20..c820ef197610 100644
--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -410,6 +410,12 @@  static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
 static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
 {
 	struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
+
+	smcd_cdc_rx_handler(conn);
+}
+
+void smcd_cdc_rx_handler(struct smc_connection *conn)
+{
 	struct smcd_cdc_msg *data_cdc;
 	struct smcd_cdc_msg cdc;
 	struct smc_sock *smc;
diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
index 696cc11f2303..11559d4ebf2b 100644
--- a/net/smc/smc_cdc.h
+++ b/net/smc/smc_cdc.h
@@ -301,5 +301,6 @@  int smcr_cdc_msg_send_validation(struct smc_connection *conn,
 				 struct smc_wr_buf *wr_buf);
 int smc_cdc_init(void) __init;
 void smcd_cdc_rx_init(struct smc_connection *conn);
+void smcd_cdc_rx_handler(struct smc_connection *conn);
 
 #endif /* SMC_CDC_H */
diff --git a/net/smc/smc_loopback.c b/net/smc/smc_loopback.c
index 353d4a2d69a1..f72e7b24fc1a 100644
--- a/net/smc/smc_loopback.c
+++ b/net/smc/smc_loopback.c
@@ -15,11 +15,13 @@ 
 #include <linux/types.h>
 #include <net/smc.h>
 
+#include "smc_cdc.h"
 #include "smc_ism.h"
 #include "smc_loopback.h"
 
 #if IS_ENABLED(CONFIG_SMC_LO)
 #define SMC_LO_V2_CAPABLE	0x1 /* loopback-ism acts as ISMv2 */
+#define SMC_DMA_ADDR_INVALID	(~(dma_addr_t)0)
 
 static const char smc_lo_dev_name[] = "loopback-ism";
 static struct smc_lo_dev *lo_dev;
@@ -50,6 +52,97 @@  static int smc_lo_query_rgid(struct smcd_dev *smcd, struct smcd_gid *rgid,
 	return 0;
 }
 
+static int smc_lo_register_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb,
+			       void *client_priv)
+{
+	struct smc_lo_dmb_node *dmb_node, *tmp_node;
+	struct smc_lo_dev *ldev = smcd->priv;
+	int sba_idx, order, rc;
+	struct page *pages;
+
+	/* check space for new dmb */
+	for_each_clear_bit(sba_idx, ldev->sba_idx_mask, SMC_LO_MAX_DMBS) {
+		if (!test_and_set_bit(sba_idx, ldev->sba_idx_mask))
+			break;
+	}
+	if (sba_idx == SMC_LO_MAX_DMBS)
+		return -ENOSPC;
+
+	dmb_node = kzalloc(sizeof(*dmb_node), GFP_KERNEL);
+	if (!dmb_node) {
+		rc = -ENOMEM;
+		goto err_bit;
+	}
+
+	dmb_node->sba_idx = sba_idx;
+	order = get_order(dmb->dmb_len);
+	pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
+			    __GFP_NOMEMALLOC | __GFP_COMP |
+			    __GFP_NORETRY | __GFP_ZERO,
+			    order);
+	if (!pages) {
+		rc = -ENOMEM;
+		goto err_node;
+	}
+	dmb_node->cpu_addr = (void *)page_address(pages);
+	dmb_node->len = dmb->dmb_len;
+	dmb_node->dma_addr = SMC_DMA_ADDR_INVALID;
+
+again:
+	/* add new dmb into hash table */
+	get_random_bytes(&dmb_node->token, sizeof(dmb_node->token));
+	write_lock(&ldev->dmb_ht_lock);
+	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_node->token) {
+		if (tmp_node->token == dmb_node->token) {
+			write_unlock(&ldev->dmb_ht_lock);
+			goto again;
+		}
+	}
+	hash_add(ldev->dmb_ht, &dmb_node->list, dmb_node->token);
+	write_unlock(&ldev->dmb_ht_lock);
+
+	dmb->sba_idx = dmb_node->sba_idx;
+	dmb->dmb_tok = dmb_node->token;
+	dmb->cpu_addr = dmb_node->cpu_addr;
+	dmb->dma_addr = dmb_node->dma_addr;
+	dmb->dmb_len = dmb_node->len;
+
+	return 0;
+
+err_node:
+	kfree(dmb_node);
+err_bit:
+	clear_bit(sba_idx, ldev->sba_idx_mask);
+	return rc;
+}
+
+static int smc_lo_unregister_dmb(struct smcd_dev *smcd, struct smcd_dmb *dmb)
+{
+	struct smc_lo_dmb_node *dmb_node = NULL, *tmp_node;
+	struct smc_lo_dev *ldev = smcd->priv;
+
+	/* remove dmb from hash table */
+	write_lock(&ldev->dmb_ht_lock);
+	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb->dmb_tok) {
+		if (tmp_node->token == dmb->dmb_tok) {
+			dmb_node = tmp_node;
+			break;
+		}
+	}
+	if (!dmb_node) {
+		write_unlock(&ldev->dmb_ht_lock);
+		return -EINVAL;
+	}
+	hash_del(&dmb_node->list);
+	write_unlock(&ldev->dmb_ht_lock);
+
+	clear_bit(dmb_node->sba_idx, ldev->sba_idx_mask);
+	kfree(dmb_node->cpu_addr);
+	kfree(dmb_node);
+
+	return 0;
+}
+
 static int smc_lo_add_vlan_id(struct smcd_dev *smcd, u64 vlan_id)
 {
 	return -EOPNOTSUPP;
@@ -76,6 +169,38 @@  static int smc_lo_signal_event(struct smcd_dev *dev, struct smcd_gid *rgid,
 	return 0;
 }
 
+static int smc_lo_move_data(struct smcd_dev *smcd, u64 dmb_tok,
+			    unsigned int idx, bool sf, unsigned int offset,
+			    void *data, unsigned int size)
+{
+	struct smc_lo_dmb_node *rmb_node = NULL, *tmp_node;
+	struct smc_lo_dev *ldev = smcd->priv;
+
+	read_lock(&ldev->dmb_ht_lock);
+	hash_for_each_possible(ldev->dmb_ht, tmp_node, list, dmb_tok) {
+		if (tmp_node->token == dmb_tok) {
+			rmb_node = tmp_node;
+			break;
+		}
+	}
+	if (!rmb_node) {
+		read_unlock(&ldev->dmb_ht_lock);
+		return -EINVAL;
+	}
+	read_unlock(&ldev->dmb_ht_lock);
+
+	memcpy((char *)rmb_node->cpu_addr + offset, data, size);
+
+	if (sf) {
+		struct smc_connection *conn =
+			smcd->conn[rmb_node->sba_idx];
+
+		if (conn && !conn->killed)
+			smcd_cdc_rx_handler(conn);
+	}
+	return 0;
+}
+
 static int smc_lo_supports_v2(void)
 {
 	return SMC_LO_V2_CAPABLE;
@@ -102,14 +227,14 @@  static struct device *smc_lo_get_dev(struct smcd_dev *smcd)
 
 static const struct smcd_ops lo_ops = {
 	.query_remote_gid = smc_lo_query_rgid,
-	.register_dmb		= NULL,
-	.unregister_dmb		= NULL,
+	.register_dmb = smc_lo_register_dmb,
+	.unregister_dmb = smc_lo_unregister_dmb,
 	.add_vlan_id = smc_lo_add_vlan_id,
 	.del_vlan_id = smc_lo_del_vlan_id,
 	.set_vlan_required = smc_lo_set_vlan_required,
 	.reset_vlan_required = smc_lo_reset_vlan_required,
 	.signal_event = smc_lo_signal_event,
-	.move_data		= NULL,
+	.move_data = smc_lo_move_data,
 	.supports_v2 = smc_lo_supports_v2,
 	.get_local_gid = smc_lo_get_local_gid,
 	.get_chid = smc_lo_get_chid,
@@ -174,6 +299,8 @@  static void smcd_lo_unregister_dev(struct smc_lo_dev *ldev)
 static int smc_lo_dev_init(struct smc_lo_dev *ldev)
 {
 	smc_lo_generate_id(ldev);
+	rwlock_init(&ldev->dmb_ht_lock);
+	hash_init(ldev->dmb_ht);
 	return smcd_lo_register_dev(ldev);
 }
 
diff --git a/net/smc/smc_loopback.h b/net/smc/smc_loopback.h
index 55b41133a97f..24ab9d747613 100644
--- a/net/smc/smc_loopback.h
+++ b/net/smc/smc_loopback.h
@@ -20,13 +20,26 @@ 
 
 #if IS_ENABLED(CONFIG_SMC_LO)
 #define SMC_LO_MAX_DMBS		5000
+#define SMC_LO_DMBS_HASH_BITS	12
 #define SMC_LO_CHID		0xFFFF
 
+struct smc_lo_dmb_node {
+	struct hlist_node list;
+	u64 token;
+	u32 len;
+	u32 sba_idx;
+	void *cpu_addr;
+	dma_addr_t dma_addr;
+};
+
 struct smc_lo_dev {
 	struct smcd_dev *smcd;
 	struct device dev;
 	u16 chid;
 	struct smcd_gid local_gid;
+	rwlock_t dmb_ht_lock;
+	DECLARE_BITMAP(sba_idx_mask, SMC_LO_MAX_DMBS);
+	DECLARE_HASHTABLE(dmb_ht, SMC_LO_DMBS_HASH_BITS);
 };
 #endif