From patchwork Mon Sep 20 18:13:33 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Arlin Davis X-Patchwork-Id: 195552 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by demeter1.kernel.org (8.14.4/8.14.3) with ESMTP id o8KIDfhf006436 for ; Mon, 20 Sep 2010 18:13:41 GMT Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1754120Ab0ITSNk (ORCPT ); Mon, 20 Sep 2010 14:13:40 -0400 Received: from mga03.intel.com ([143.182.124.21]:22489 "EHLO mga03.intel.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1753593Ab0ITSNj convert rfc822-to-8bit (ORCPT ); Mon, 20 Sep 2010 14:13:39 -0400 Received: from azsmga001.ch.intel.com ([10.2.17.19]) by azsmga101.ch.intel.com with ESMTP; 20 Sep 2010 11:13:34 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="4.56,395,1280732400"; d="scan'208";a="326658492" Received: from orsmsx603.amr.corp.intel.com ([10.22.226.49]) by azsmga001.ch.intel.com with ESMTP; 20 Sep 2010 11:13:34 -0700 Received: from orsmsx506.amr.corp.intel.com ([10.22.226.44]) by orsmsx603.amr.corp.intel.com ([10.22.226.49]) with mapi; Mon, 20 Sep 2010 11:13:34 -0700 From: "Davis, Arlin R" To: linux-rdma , "ofw@lists.openfabrics.org" CC: "Smith, Stan" Date: Mon, 20 Sep 2010 11:13:33 -0700 Subject: [PATCH] uDAPL v2.0 - common: restructure EVD processing to handle EP destruction phase and overflow conditions Thread-Topic: [PATCH] uDAPL v2.0 - common: restructure EVD processing to handle EP destruction phase and overflow conditions Thread-Index: ActY74kogZq+6B8uQDaVLvjAFMUTtg== Message-ID: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: acceptlanguage: en-US MIME-Version: 1.0 Sender: linux-rdma-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org X-Greylist: IP, sender and recipient auto-whitelisted, not delayed by milter-greylist-4.2.3 (demeter1.kernel.org [140.211.167.41]); Mon, 20 Sep 2010 18:13:42 +0000 (UTC) diff --git a/dapl/common/dapl_cno_util.c b/dapl/common/dapl_cno_util.c index 2215f29..cad9747 100755 --- a/dapl/common/dapl_cno_util.c +++ b/dapl/common/dapl_cno_util.c @@ -148,9 +148,6 @@ void dapl_cno_dealloc(IN DAPL_CNO * cno_ptr) void dapl_internal_cno_trigger(IN DAPL_CNO * cno_ptr, IN DAPL_EVD * evd_ptr) { DAT_RETURN dat_status; -#if defined(__KDAPL__) - DAT_EVENT event; -#endif /* defined(__KDAPL__) */ dat_status = DAT_SUCCESS; @@ -167,20 +164,14 @@ void dapl_internal_cno_trigger(IN DAPL_CNO * cno_ptr, IN DAPL_EVD * evd_ptr) dapl_os_assert(cno_ptr->cno_state != DAPL_CNO_STATE_DEAD); if (cno_ptr->cno_state == DAPL_CNO_STATE_UNTRIGGERED) { -#if !defined(__KDAPL__) DAT_OS_WAIT_PROXY_AGENT agent; /* Squirrel away wait agent, and delete link. */ agent = cno_ptr->cno_wait_agent; -#endif /* !defined(__KDAPL__) */ /* Separate assignments for windows compiler. */ #ifndef _WIN32 -#if defined(__KDAPL__) - cno_ptr->cno_upcall = DAT_UPCALL_NULL; -#else cno_ptr->cno_wait_agent = DAT_OS_WAIT_PROXY_AGENT_NULL; -#endif /* defined(__KDAPL__) */ #else cno_ptr->cno_wait_agent.instance_data = NULL; cno_ptr->cno_wait_agent.proxy_agent_func = NULL; @@ -200,43 +191,12 @@ void dapl_internal_cno_trigger(IN DAPL_CNO * cno_ptr, IN DAPL_EVD * evd_ptr) dapl_os_unlock(&cno_ptr->header.lock); /* Trigger the OS proxy wait agent, if one exists. */ -#if defined(__KDAPL__) - dat_status = dapl_evd_dequeue((DAT_EVD_HANDLE) evd_ptr, &event); - while (dat_status == DAT_SUCCESS) { - if (cno_ptr->cno_upcall.upcall_func != - (DAT_UPCALL_FUNC) NULL) { - cno_ptr->cno_upcall.upcall_func(cno_ptr-> - cno_upcall. - instance_data, - &event, - DAT_FALSE); - } - dat_status = dapl_evd_dequeue((DAT_EVD_HANDLE) evd_ptr, - &event); - } -#else if (agent.proxy_agent_func != (DAT_AGENT_FUNC) NULL) { agent.proxy_agent_func(agent.instance_data, (DAT_EVD_HANDLE) evd_ptr); } -#endif /* defined(__KDAPL__) */ } else { dapl_os_unlock(&cno_ptr->header.lock); -#if defined(__KDAPL__) - dat_status = dapl_evd_dequeue((DAT_EVD_HANDLE) evd_ptr, &event); - while (dat_status == DAT_SUCCESS) { - if (cno_ptr->cno_upcall.upcall_func != - (DAT_UPCALL_FUNC) NULL) { - cno_ptr->cno_upcall.upcall_func(cno_ptr-> - cno_upcall. - instance_data, - &event, - DAT_FALSE); - } - dat_status = dapl_evd_dequeue((DAT_EVD_HANDLE) evd_ptr, - &event); - } -#endif /* defined(__KDAPL__) */ } return; diff --git a/dapl/common/dapl_ep_disconnect.c b/dapl/common/dapl_ep_disconnect.c index 72da620..90748b0 100644 --- a/dapl/common/dapl_ep_disconnect.c +++ b/dapl/common/dapl_ep_disconnect.c @@ -165,6 +165,7 @@ dapl_ep_disconnect(IN DAT_EP_HANDLE ep_handle, } dapl_os_unlock(&ep_ptr->header.lock); dat_status = dapls_ib_disconnect(ep_ptr, disconnect_flags); + dapls_ep_flush_cqs(ep_ptr); bail: dapl_dbg_log(DAPL_DBG_TYPE_RTN | DAPL_DBG_TYPE_CM, diff --git a/dapl/common/dapl_ep_free.c b/dapl/common/dapl_ep_free.c index 3bfc541..32d50cc 100644 --- a/dapl/common/dapl_ep_free.c +++ b/dapl/common/dapl_ep_free.c @@ -202,6 +202,8 @@ DAT_RETURN DAT_API dapl_ep_free(IN DAT_EP_HANDLE ep_handle) } } + dapls_ep_flush_cqs(ep_ptr); + /* Free the resource */ dapl_ep_dealloc(ep_ptr); diff --git a/dapl/common/dapl_ep_util.c b/dapl/common/dapl_ep_util.c index 9aff242..fc911a6 100644 --- a/dapl/common/dapl_ep_util.c +++ b/dapl/common/dapl_ep_util.c @@ -606,6 +606,27 @@ void dapl_ep_unlink_cm(IN DAPL_EP *ep_ptr, IN dp_ib_cm_handle_t cm_ptr) dapl_os_unlock(&ep_ptr->header.lock); } +static void dapli_ep_flush_evd(DAPL_EVD *evd_ptr) +{ + DAT_RETURN dat_status; + + dapl_os_lock(&evd_ptr->header.lock); + dat_status = dapls_evd_copy_cq(evd_ptr); + dapl_os_unlock(&evd_ptr->header.lock); + + if (dat_status == DAT_QUEUE_FULL) + dapls_evd_post_overflow_event(evd_ptr); +} + +void dapls_ep_flush_cqs(DAPL_EP * ep_ptr) +{ + if (ep_ptr->param.request_evd_handle) + dapli_ep_flush_evd((DAPL_EVD *) ep_ptr->param.request_evd_handle); + + if (ep_ptr->param.recv_evd_handle) + dapli_ep_flush_evd((DAPL_EVD *) ep_ptr->param.recv_evd_handle); +} + /* * Local variables: * c-indent-level: 4 diff --git a/dapl/common/dapl_ep_util.h b/dapl/common/dapl_ep_util.h index 31d0e23..37805d4 100644 --- a/dapl/common/dapl_ep_util.h +++ b/dapl/common/dapl_ep_util.h @@ -101,5 +101,7 @@ STATIC _INLINE_ dp_ib_cm_handle_t dapl_get_cm_from_ep(IN DAPL_EP *ep_ptr) return cm_ptr; } - + +extern void dapls_ep_flush_cqs(DAPL_EP * ep_ptr); + #endif /* _DAPL_EP_UTIL_H_ */ diff --git a/dapl/common/dapl_evd_util.c b/dapl/common/dapl_evd_util.c index 12d38ff..237293b 100644 --- a/dapl/common/dapl_evd_util.c +++ b/dapl/common/dapl_evd_util.c @@ -160,15 +160,6 @@ dapls_evd_internal_create(DAPL_IA * ia_ptr, goto bail; } - /* - * If we are dealing with event streams besides a CQ event stream, - * be conservative and set producer side locking. Otherwise, no. - * Note: CNO is not considered CQ event stream. - */ - evd_ptr->evd_producer_locking_needed = - (!(evd_flags & (DAT_EVD_DTO_FLAG | DAT_EVD_RMR_BIND_FLAG)) || - evd_ptr->cno_ptr); - /* Before we setup any callbacks, transition state to OPEN. */ evd_ptr->evd_state = DAPL_EVD_STATE_OPEN; @@ -299,7 +290,6 @@ DAPL_EVD *dapls_evd_alloc(IN DAPL_IA * ia_ptr, evd_ptr->evd_flags = evd_flags; evd_ptr->evd_enabled = DAT_TRUE; evd_ptr->evd_waitable = DAT_TRUE; - evd_ptr->evd_producer_locking_needed = 1; /* Conservative value. */ evd_ptr->ib_cq_handle = IB_INVALID_HANDLE; dapl_os_atomic_set(&evd_ptr->evd_ref_count, 0); evd_ptr->catastrophic_overflow = DAT_FALSE; @@ -583,60 +573,12 @@ void dapli_evd_eh_print_cqe(IN ib_work_completion_t * cqe_ptr) * Event posting code follows. */ -/* - * These next two functions (dapli_evd_get_event and dapli_evd_post_event) - * are a pair. They are always called together, from one of the functions - * at the end of this file (dapl_evd_post_*_event). - * - * Note that if producer side locking is enabled, the first one takes the - * EVD lock and the second releases it. - */ - -/* dapli_evd_get_event - * - * Get an event struct from the evd. The caller should fill in the event - * and call dapl_evd_post_event. - * - * If there are no events available, an overflow event is generated to the - * async EVD handler. - * - * If this EVD required producer locking, a successful return implies - * that the lock is held. - * - * Input: - * evd_ptr - * - * Output: - * event - * - */ - -static DAT_EVENT *dapli_evd_get_event(DAPL_EVD * evd_ptr) -{ - DAT_EVENT *event; - - if (evd_ptr->evd_producer_locking_needed) { - dapl_os_lock(&evd_ptr->header.lock); - } - - event = (DAT_EVENT *) dapls_rbuf_remove(&evd_ptr->free_event_queue); - - /* Release the lock if it was taken and the call failed. */ - if (!event && evd_ptr->evd_producer_locking_needed) { - dapl_os_unlock(&evd_ptr->header.lock); - } - - return event; -} /* dapli_evd_post_event * * Post the to the evd. If possible, invoke the evd's CNO. * Otherwise post the event on the pending queue. * - * If producer side locking is required, the EVD lock must be held upon - * entry to this function. - * * Input: * evd_ptr * event @@ -650,7 +592,6 @@ static void dapli_evd_post_event(IN DAPL_EVD * evd_ptr, IN const DAT_EVENT * event_ptr) { DAT_RETURN dat_status; - DAPL_CNO *cno_to_trigger = NULL; dapl_dbg_log(DAPL_DBG_TYPE_EVD, "%s: %s evd %p state %d\n", __FUNCTION__, dapl_event_str(event_ptr->event_number), @@ -665,110 +606,37 @@ dapli_evd_post_event(IN DAPL_EVD * evd_ptr, IN const DAT_EVENT * event_ptr) if (evd_ptr->evd_state == DAPL_EVD_STATE_OPEN) { /* No waiter. Arrange to trigger a CNO if it exists. */ - - if (evd_ptr->evd_enabled) { - cno_to_trigger = evd_ptr->cno_ptr; - } - if (evd_ptr->evd_producer_locking_needed) { - dapl_os_unlock(&evd_ptr->header.lock); - } + if (evd_ptr->evd_enabled && evd_ptr->cno_ptr) + dapl_internal_cno_trigger(evd_ptr->cno_ptr, evd_ptr); } else { - /* - * We're in DAPL_EVD_STATE_WAITED. Take the lock if - * we don't have it, recheck, and signal. - */ - if (!evd_ptr->evd_producer_locking_needed) { - dapl_os_lock(&evd_ptr->header.lock); - } - if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED && (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= evd_ptr->threshold)) { - dapl_os_unlock(&evd_ptr->header.lock); if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG | DAT_EVD_RMR_BIND_FLAG)) { dapls_evd_dto_wakeup(evd_ptr); } else { dapl_os_wait_object_wakeup(&evd_ptr->wait_object); } - - } else { - dapl_os_unlock(&evd_ptr->header.lock); } } - - if (cno_to_trigger != NULL) { - dapl_internal_cno_trigger(cno_to_trigger, evd_ptr); - } } -/* dapli_evd_post_event_nosignal - * - * Post the to the evd. Do not do any wakeup processing. - * This function should only be called if it is known that there are - * no waiters that it is appropriate to wakeup on this EVD. An example - * of such a situation is during internal dat_evd_wait() processing. - * - * If producer side locking is required, the EVD lock must be held upon - * entry to this function. - * - * Input: - * evd_ptr - * event - * - * Output: - * none - * - */ - -static void -dapli_evd_post_event_nosignal(IN DAPL_EVD * evd_ptr, - IN const DAT_EVENT * event_ptr) +static DAT_EVENT *dapli_evd_get_and_init_event(IN DAPL_EVD * evd_ptr, + IN DAT_EVENT_NUMBER event_number) { - DAT_RETURN dat_status; - - dapl_dbg_log(DAPL_DBG_TYPE_EVD, "%s: Called with event %s\n", - __FUNCTION__, dapl_event_str(event_ptr->event_number)); - - dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue, - (void *)event_ptr); - dapl_os_assert(dat_status == DAT_SUCCESS); - - dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED - || evd_ptr->evd_state == DAPL_EVD_STATE_OPEN); + DAT_EVENT *event_ptr; - if (evd_ptr->evd_producer_locking_needed) { - dapl_os_unlock(&evd_ptr->header.lock); + event_ptr = (DAT_EVENT *) dapls_rbuf_remove(&evd_ptr->free_event_queue); + if (event_ptr) { + event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr; + event_ptr->event_number = event_number; } -} - -/* dapli_evd_format_overflow_event - * - * format an overflow event for posting - * - * Input: - * evd_ptr - * event_ptr - * - * Output: - * none - * - */ -static void -dapli_evd_format_overflow_event(IN DAPL_EVD * evd_ptr, - OUT DAT_EVENT * event_ptr) -{ - DAPL_IA *ia_ptr; - - ia_ptr = evd_ptr->header.owner_ia; - event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr; - event_ptr->event_number = DAT_ASYNC_ERROR_EVD_OVERFLOW; - event_ptr->event_data.asynch_error_event_data.dat_handle = - (DAT_HANDLE) ia_ptr; + return event_ptr; } -/* dapli_evd_post_overflow_event +/* dapls_evd_post_overflow_event * * post an overflow event * @@ -780,52 +648,38 @@ dapli_evd_format_overflow_event(IN DAPL_EVD * evd_ptr, * none * */ -static void -dapli_evd_post_overflow_event(IN DAPL_EVD * async_evd_ptr, - IN DAPL_EVD * overflow_evd_ptr) +void +dapls_evd_post_overflow_event(IN DAPL_EVD * evd_ptr) { - DAT_EVENT *overflow_event; + DAPL_EVD *async_evd_ptr = evd_ptr->header.owner_ia->async_error_evd; + DAT_EVENT *event_ptr; - /* The overflow_evd_ptr mght be the same as evd. - * In that case we've got a catastrophic overflow. - */ - dapl_log(DAPL_DBG_TYPE_WARN, - " WARNING: overflow event on EVD %p/n", overflow_evd_ptr); + dapl_log(DAPL_DBG_TYPE_WARN, " WARNING: overflow event on EVD %p/n", evd_ptr); - if (async_evd_ptr == overflow_evd_ptr) { - async_evd_ptr->catastrophic_overflow = DAT_TRUE; - async_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD; - return; - } + dapl_os_lock(&async_evd_ptr->header.lock); - overflow_event = dapli_evd_get_event(overflow_evd_ptr); - if (!overflow_event) { - /* this is not good */ - overflow_evd_ptr->catastrophic_overflow = DAT_TRUE; - overflow_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD; - return; - } - dapli_evd_format_overflow_event(overflow_evd_ptr, overflow_event); - dapli_evd_post_event(overflow_evd_ptr, overflow_event); + /* The overflow evd_ptr mght be the same as the async evd. + * In that case we've got a catastrophic overflow. + */ + if (async_evd_ptr == evd_ptr) + goto err; + + event_ptr = dapli_evd_get_and_init_event(async_evd_ptr, + DAT_ASYNC_ERROR_EVD_OVERFLOW); + if (!event_ptr) + goto err; + + event_ptr->event_data.asynch_error_event_data.dat_handle = + (DAT_HANDLE) evd_ptr->header.owner_ia; + dapli_evd_post_event(async_evd_ptr, event_ptr); + dapl_os_unlock(&async_evd_ptr->header.lock); return; -} - -static DAT_EVENT *dapli_evd_get_and_init_event(IN DAPL_EVD * evd_ptr, - IN DAT_EVENT_NUMBER event_number) -{ - DAT_EVENT *event_ptr; - event_ptr = dapli_evd_get_event(evd_ptr); - if (NULL == event_ptr) { - dapli_evd_post_overflow_event(evd_ptr->header.owner_ia-> - async_error_evd, evd_ptr); - } else { - event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr; - event_ptr->event_number = event_number; - } - - return event_ptr; +err: + async_evd_ptr->catastrophic_overflow = DAT_TRUE; + async_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD; + dapl_os_unlock(&async_evd_ptr->header.lock); } DAT_RETURN @@ -837,17 +691,11 @@ dapls_evd_post_cr_arrival_event(IN DAPL_EVD * evd_ptr, DAT_CR_HANDLE cr_handle) { DAT_EVENT *event_ptr; - event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ - if (event_ptr == NULL) { - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - } + dapl_os_lock(&evd_ptr->header.lock); + event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); + if (event_ptr == NULL) + goto err; event_ptr->event_data.cr_arrival_event_data.sp_handle = sp_handle; event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr @@ -856,8 +704,13 @@ dapls_evd_post_cr_arrival_event(IN DAPL_EVD * evd_ptr, event_ptr->event_data.cr_arrival_event_data.cr_handle = cr_handle; dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } DAT_RETURN @@ -868,17 +721,11 @@ dapls_evd_post_connection_event(IN DAPL_EVD * evd_ptr, IN DAT_PVOID private_data) { DAT_EVENT *event_ptr; - event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ - if (event_ptr == NULL) { - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - } + dapl_os_lock(&evd_ptr->header.lock); + event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); + if (event_ptr == NULL) + goto err; event_ptr->event_data.connect_event_data.ep_handle = ep_handle; event_ptr->event_data.connect_event_data.private_data_size @@ -886,8 +733,13 @@ dapls_evd_post_connection_event(IN DAPL_EVD * evd_ptr, event_ptr->event_data.connect_event_data.private_data = private_data; dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } DAT_RETURN @@ -896,27 +748,27 @@ dapls_evd_post_async_error_event(IN DAPL_EVD * evd_ptr, IN DAT_IA_HANDLE ia_handle) { DAT_EVENT *event_ptr; - event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ + dapl_log(DAPL_DBG_TYPE_WARN, " WARNING: async event - %s evd=%p/n", dapl_event_str(event_number), evd_ptr); - if (event_ptr == NULL) { - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - } + dapl_os_lock(&evd_ptr->header.lock); + event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); + if (event_ptr == NULL) + goto err; event_ptr->event_data.asynch_error_event_data.dat_handle = (DAT_HANDLE) ia_handle; dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } DAT_RETURN @@ -925,23 +777,22 @@ dapls_evd_post_software_event(IN DAPL_EVD * evd_ptr, IN DAT_PVOID pointer) { DAT_EVENT *event_ptr; - event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ - if (event_ptr == NULL) { - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - } + dapl_os_lock(&evd_ptr->header.lock); + event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); + if (event_ptr == NULL) + goto err; event_ptr->event_data.software_event_data.pointer = pointer; dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } /* @@ -968,27 +819,58 @@ dapls_evd_post_generic_event(IN DAPL_EVD * evd_ptr, { DAT_EVENT *event_ptr; + dapl_os_lock(&evd_ptr->header.lock); event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ - - if (event_ptr == NULL) { - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - } + if (event_ptr == NULL) + goto err; event_ptr->event_data = *data; dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } #ifdef DAT_EXTENSIONS DAT_RETURN +dapls_evd_do_post_cr_event_ext(IN DAPL_EVD * evd_ptr, + IN DAT_EVENT_NUMBER event_number, + IN DAPL_SP *sp_ptr, + IN DAPL_CR *cr_ptr, + IN DAT_PVOID ext_data) +{ + DAT_EVENT *event_ptr; + + dapl_os_lock(&evd_ptr->header.lock); + event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); + if (event_ptr == NULL) + goto err; + + event_ptr->event_data.cr_arrival_event_data.sp_handle.psp_handle = + (DAT_PSP_HANDLE) sp_ptr; + event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr = + (DAT_IA_ADDRESS_PTR) &sp_ptr->header.owner_ia->hca_ptr->hca_address; + event_ptr->event_data.cr_arrival_event_data.conn_qual = sp_ptr->conn_qual; + event_ptr->event_data.cr_arrival_event_data.cr_handle = (DAT_CR_HANDLE) cr_ptr; + + dapl_os_memcpy(&event_ptr->event_extension_data[0], ext_data, 64); + + dapli_evd_post_event(sp_ptr->evd_handle, event_ptr); + dapl_os_unlock(&evd_ptr->header.lock); + return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); +} + +DAT_RETURN dapls_evd_post_cr_event_ext(IN DAPL_SP * sp_ptr, IN DAT_EVENT_NUMBER event_number, IN dp_ib_cm_handle_t ib_cm_handle, @@ -998,7 +880,6 @@ dapls_evd_post_cr_event_ext(IN DAPL_SP * sp_ptr, DAPL_CR *cr_ptr; DAPL_EP *ep_ptr; DAT_EVENT *event_ptr; - DAT_SP_HANDLE sp_handle; dapl_os_lock(&sp_ptr->header.lock); if (sp_ptr->listening == DAT_FALSE) { @@ -1087,36 +968,8 @@ dapls_evd_post_cr_event_ext(IN DAPL_SP * sp_ptr, /* link the CR onto the SP so we can pick it up later */ dapl_sp_link_cr(sp_ptr, cr_ptr); - /* assign sp_ptr to union to avoid typecast errors from some compilers */ - sp_handle.psp_handle = (DAT_PSP_HANDLE) sp_ptr; - - /* Post the event. */ - - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ - event_ptr = dapli_evd_get_and_init_event(sp_ptr->evd_handle, - event_number); - if (event_ptr == NULL) - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); - - event_ptr->event_data.cr_arrival_event_data.sp_handle = sp_handle; - event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr = - (DAT_IA_ADDRESS_PTR) & sp_ptr->header.owner_ia->hca_ptr-> - hca_address; - event_ptr->event_data.cr_arrival_event_data.conn_qual = - sp_ptr->conn_qual; - event_ptr->event_data.cr_arrival_event_data.cr_handle = - (DAT_HANDLE) cr_ptr; - - dapl_os_memcpy(&event_ptr->event_extension_data[0], ext_data, 64); - - dapli_evd_post_event(sp_ptr->evd_handle, event_ptr); - - return DAT_SUCCESS; + return dapls_evd_do_post_cr_event_ext(sp_ptr->evd_handle, event_number, + sp_ptr, cr_ptr, ext_data); } DAT_RETURN @@ -1128,15 +981,11 @@ dapls_evd_post_connection_event_ext(IN DAPL_EVD * evd_ptr, IN DAT_PVOID ext_data) { DAT_EVENT *event_ptr; + + dapl_os_lock(&evd_ptr->header.lock); event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number); - /* - * Note event lock may be held on successful return - * to be released by dapli_evd_post_event(), if provider side locking - * is needed. - */ if (event_ptr == NULL) - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, - DAT_RESOURCE_MEMORY); + goto err; event_ptr->event_data.connect_event_data.ep_handle = ep_handle; event_ptr->event_data.connect_event_data.private_data_size @@ -1146,8 +995,13 @@ dapls_evd_post_connection_event_ext(IN DAPL_EVD * evd_ptr, dapl_os_memcpy(&event_ptr->event_extension_data[0], ext_data, 64); dapli_evd_post_event(evd_ptr, event_ptr); - + dapl_os_unlock(&evd_ptr->header.lock); return DAT_SUCCESS; + +err: + dapl_os_unlock(&evd_ptr->header.lock); + dapls_evd_post_overflow_event(evd_ptr); + return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } #endif @@ -1187,10 +1041,6 @@ dapli_evd_cqe_to_event(IN DAPL_EVD * evd_ptr, ep_ptr = cookie->ep; dapl_os_assert((NULL != ep_ptr)); - if (ep_ptr->header.magic != DAPL_MAGIC_EP) { - /* ep may have been freed, just return */ - return; - } dapls_io_trc_update_completion(ep_ptr, cookie, dto_status); @@ -1343,18 +1193,8 @@ dapli_evd_cqe_to_event(IN DAPL_EVD * evd_ptr, * Copy all entries on a CQ associated with the EVD onto that EVD * Up to caller to handle races, if any. Note that no EVD waiters will * be awoken by this copy. - * - * Input: - * evd_ptr - * - * Output: - * None - * - * Returns: - * none - * */ -void dapls_evd_copy_cq(DAPL_EVD * evd_ptr) +DAT_RETURN dapls_evd_copy_cq(DAPL_EVD * evd_ptr) { ib_work_completion_t cur_cqe; DAT_RETURN dat_status; @@ -1362,7 +1202,7 @@ void dapls_evd_copy_cq(DAPL_EVD * evd_ptr) if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) { /* Nothing to do if no CQ. */ - return; + return DAT_SUCCESS; } while (1) { @@ -1381,18 +1221,13 @@ void dapls_evd_copy_cq(DAPL_EVD * evd_ptr) * Can use DAT_DTO_COMPLETION_EVENT because dapli_evd_cqe_to_event * will overwrite. */ - - event = - dapli_evd_get_and_init_event(evd_ptr, - DAT_DTO_COMPLETION_EVENT); - if (event == NULL) { - /* We've already attempted the overflow post; return. */ - return; - } + event = dapli_evd_get_and_init_event(evd_ptr, DAT_DTO_COMPLETION_EVENT); + if (event == NULL) + return DAT_QUEUE_FULL; dapli_evd_cqe_to_event(evd_ptr, &cur_cqe, event); - dapli_evd_post_event_nosignal(evd_ptr, event); + dapli_evd_post_event(evd_ptr, event); } if (DAT_GET_TYPE(dat_status) != DAT_QUEUE_EMPTY) { @@ -1400,7 +1235,9 @@ void dapls_evd_copy_cq(DAPL_EVD * evd_ptr) "dapls_evd_copy_cq: dapls_ib_completion_poll returned 0x%x\n", dat_status); dapl_os_assert(!"Bad return from dapls_ib_completion_poll"); + return dat_status; } + return DAT_SUCCESS; } /* diff --git a/dapl/common/dapl_evd_util.h b/dapl/common/dapl_evd_util.h index e5a7c3f..65472d7 100644 --- a/dapl/common/dapl_evd_util.h +++ b/dapl/common/dapl_evd_util.h @@ -165,11 +165,14 @@ extern void dapl_evd_qp_async_error_callback ( IN ib_error_record_t * cause_ptr, IN void * context); -extern void dapls_evd_copy_cq ( +extern DAT_RETURN dapls_evd_copy_cq ( DAPL_EVD *evd_ptr); extern DAT_RETURN dapls_evd_cq_poll_to_event ( IN DAPL_EVD *evd_ptr, OUT DAT_EVENT *event); +extern void dapls_evd_post_overflow_event ( + IN DAPL_EVD *evd_ptr); + #endif diff --git a/dapl/include/dapl.h b/dapl/include/dapl.h index 8dab61e..a522f15 100755 --- a/dapl/include/dapl.h +++ b/dapl/include/dapl.h @@ -349,9 +349,6 @@ struct dapl_evd DAT_BOOLEAN evd_enabled; /* For attached CNO. */ DAT_BOOLEAN evd_waitable; /* EVD state. */ - /* Derived from evd_flags; see dapls_evd_internal_create. */ - DAT_BOOLEAN evd_producer_locking_needed; - /* Every EVD has a CQ unless it is a SOFTWARE_EVENT only EVD */ ib_cq_handle_t ib_cq_handle; diff --git a/dapl/udapl/dapl_evd_set_unwaitable.c b/dapl/udapl/dapl_evd_set_unwaitable.c index 718e433..36b632a 100644 --- a/dapl/udapl/dapl_evd_set_unwaitable.c +++ b/dapl/udapl/dapl_evd_set_unwaitable.c @@ -71,7 +71,6 @@ DAT_RETURN DAT_API dapl_evd_set_unwaitable(IN DAT_EVD_HANDLE evd_handle) } dapl_os_lock(&evd_ptr->header.lock); evd_ptr->evd_waitable = DAT_FALSE; - dapl_os_unlock(&evd_ptr->header.lock); /* * If this evd is waiting, wake it up. There is an obvious race @@ -85,6 +84,7 @@ DAT_RETURN DAT_API dapl_evd_set_unwaitable(IN DAT_EVD_HANDLE evd_handle) else dapl_os_wait_object_wakeup(&evd_ptr->wait_object); } + dapl_os_unlock(&evd_ptr->header.lock); bail: return dat_status; } diff --git a/dapl/udapl/dapl_evd_wait.c b/dapl/udapl/dapl_evd_wait.c index 79afb0d..33cec50 100644 --- a/dapl/udapl/dapl_evd_wait.c +++ b/dapl/udapl/dapl_evd_wait.c @@ -168,14 +168,12 @@ DAT_RETURN DAT_API dapl_evd_wait(IN DAT_EVD_HANDLE evd_handle, * return right away if the ib_cq_handle associate with these evd * equal to IB_INVALID_HANDLE */ - dapl_os_unlock(&evd_ptr->header.lock); - dapls_evd_copy_cq(evd_ptr); - dapl_os_lock(&evd_ptr->header.lock); + dat_status = dapls_evd_copy_cq(evd_ptr); + if (dat_status == DAT_QUEUE_FULL) + goto bail; - if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= - threshold) { + if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold) break; - } /* * Do not enable the completion notification if this evd is not @@ -266,6 +264,8 @@ DAT_RETURN DAT_API dapl_evd_wait(IN DAT_EVD_HANDLE evd_handle, if (dat_status) { dapl_dbg_log(DAPL_DBG_TYPE_RTN, "dapl_evd_wait () returns 0x%x\n", dat_status); + if (dat_status == DAT_QUEUE_FULL) + dapls_evd_post_overflow_event(evd_ptr); } return dat_status; }