Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Enable Threading in the BTL TCP
Added mca parameter to turn progress thread on/off
Add a flag to check if we have btl progress thread.
Added macro for ob1 matching lock.
Update the AUTHORS file.
  • Loading branch information
Thananon Patinyasakdikul authored and bosilca committed Mar 28, 2016
commit 92062492b9ad4ce6b362e0243c3d481d0492c632
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ [email protected] Sylvain Jeaugey Bull
[email protected] Terry Dontje Sun, Oracle
[email protected] Todd Kordenbrock SNL
[email protected] Tim Mattox IU, Cisco
[email protected] Thananon Patinyasakdikul UTK
[email protected] Tim Prins IU, LANL
[email protected] Tim Woodall LANL
[email protected] Vasily Filipov Mellanox
Expand Down
20 changes: 19 additions & 1 deletion ompi/mca/pml/ob1/pml_ob1.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ typedef struct mca_pml_ob1_t mca_pml_ob1_t;

extern mca_pml_ob1_t mca_pml_ob1;
extern int mca_pml_ob1_output;

extern bool mca_pml_ob1_matching_protection;
/*
* PML interface functions.
*/
Expand Down Expand Up @@ -261,7 +261,25 @@ do { \
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock); \
} while(0)

#define OB1_MATCHING_LOCK(lock) \
do { \
if( mca_pml_ob1_matching_protection ) { \
opal_mutex_lock(lock); \
} \
else { OPAL_THREAD_LOCK(lock); } \
} while(0)


#define OB1_MATCHING_UNLOCK(lock) \
do { \
if( mca_pml_ob1_matching_protection ) { \
opal_mutex_unlock(lock); \
} \
else { OPAL_THREAD_UNLOCK(lock); } \
} while(0)



int mca_pml_ob1_send_fin(ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl,
opal_ptr_t hdr_frag, uint64_t size, uint8_t order, int status);

Expand Down
6 changes: 6 additions & 0 deletions ompi/mca/pml/ob1/pml_ob1_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mca_pml_ob1_component_init( int* priority, bool enable_progress_threads,
static int mca_pml_ob1_component_fini(void);
int mca_pml_ob1_output = 0;
static int mca_pml_ob1_verbose = 0;
bool mca_pml_ob1_matching_protection = false;

mca_pml_base_component_2_0_0_t mca_pml_ob1_component = {
/* First, the mca_base_component_t struct containing meta
Expand Down Expand Up @@ -277,10 +278,15 @@ mca_pml_ob1_component_init( int* priority,
OPAL_LIST_FOREACH(selected_btl, &mca_btl_base_modules_initialized, mca_btl_base_selected_module_t) {
mca_btl_base_module_t *btl = selected_btl->btl_module;

if (btl->btl_flags & MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED) {
mca_pml_ob1_matching_protection = true;
}

if (btl->btl_flags & MCA_BTL_FLAGS_SINGLE_ADD_PROCS) {
mca_pml_ob1.super.pml_flags |= MCA_PML_BASE_FLAG_REQUIRE_WORLD;
break;
}

}

/* Set this here (vs in component_open()) because
Expand Down
16 changes: 8 additions & 8 deletions ompi/mca/pml/ob1/pml_ob1_recvfrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);

/* get sequence number of next message that can be processed */
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
Expand Down Expand Up @@ -194,7 +194,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);

/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);

if(OPAL_LIKELY(match)) {
bytes_received = segments->seg_len - OMPI_PML_OB1_MATCH_HDR_LEN;
Expand Down Expand Up @@ -247,7 +247,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
return;

slow_path:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
mca_pml_ob1_recv_frag_match(btl, hdr, segments,
num_segments, MCA_PML_OB1_HDR_TYPE_MATCH);
}
Expand Down Expand Up @@ -668,7 +668,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);

/* get sequence number of next message that can be processed */
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
Expand Down Expand Up @@ -704,7 +704,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);

/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);

if(OPAL_LIKELY(match)) {
switch(type) {
Expand All @@ -729,7 +729,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
* may now be used to form new matchs
*/
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);
if((frag = check_cantmatch_for_match(proc))) {
hdr = &frag->hdr.hdr_match;
segments = frag->segments;
Expand All @@ -738,7 +738,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
type = hdr->hdr_common.hdr_type;
goto out_of_order_match;
}
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
}

return OMPI_SUCCESS;
Expand All @@ -749,7 +749,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
*/
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
num_segments, NULL);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
return OMPI_SUCCESS;
}

14 changes: 7 additions & 7 deletions ompi/mca/pml/ob1/pml_ob1_recvreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;

/* The rest should be protected behind the match logic lock */
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
if( true == request->req_match_received ) { /* way to late to cancel this one */
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
assert( OMPI_ANY_TAG != ompi_request->req_status.MPI_TAG ); /* not matched isn't it */
Expand All @@ -124,7 +124,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
* to true. Otherwise, the request will never be freed.
*/
request->req_recv.req_base.req_pml_complete = true;
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);

OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request->req_status._cancelled = true;
Expand Down Expand Up @@ -1177,7 +1177,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)

MCA_PML_BASE_RECV_START(&req->req_recv.req_base);

OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
/**
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
* the cost of the request lock.
Expand Down Expand Up @@ -1219,7 +1219,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
it when the message comes in. */
append_recv_req_to_queue(queue, req);
req->req_match_received = false;
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
} else {
if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
Expand All @@ -1237,7 +1237,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)

opal_list_remove_item(&proc->unexpected_frags,
(opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);

switch(hdr->hdr_common.hdr_type) {
case MCA_PML_OB1_HDR_TYPE_MATCH:
Expand Down Expand Up @@ -1267,14 +1267,14 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
restarted with this request during mrecv */
opal_list_remove_item(&proc->unexpected_frags,
(opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);

req->req_recv.req_base.req_addr = frag;
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
frag->segments, frag->num_segments);

} else {
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
frag->segments, frag->num_segments);
}
Expand Down
3 changes: 3 additions & 0 deletions opal/mca/btl/btl.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ typedef uint8_t mca_btl_base_tag_t;
* BTLs should not set this flag. */
#define MCA_BTL_FLAGS_SINGLE_ADD_PROCS 0x20000

/* The BTL is using progress thread and need the protection on matching */
#define MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED 0x40000

/* Default exclusivity levels */
#define MCA_BTL_EXCLUSIVITY_HIGH (64*1024) /* internal loopback */
#define MCA_BTL_EXCLUSIVITY_DEFAULT 1024 /* GM/IB/etc. */
Expand Down
34 changes: 22 additions & 12 deletions opal/mca/btl/tcp/btl_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,20 @@

/* Open MPI includes */
#include "opal/mca/event/event.h"
#include "ompi/class/ompi_free_list.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "opal/class/opal_free_list.h"
#include "opal/mca/btl/btl.h"
#include "opal/mca/btl/base/base.h"
#include "opal/mca/mpool/mpool.h"
#include "opal/class/opal_hash_table.h"
#include "opal/util/fd.h"

#define MCA_BTL_TCP_STATISTICS 0
BEGIN_C_DECLS

#if (HAVE_PTHREAD_H == 1)
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 1
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 1
#else
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 0
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 0
#endif /* (HAVE_PTHREAD_H == 1) */

extern opal_event_base_t* mca_btl_tcp_event_base;
Expand All @@ -80,10 +81,11 @@ extern opal_event_base_t* mca_btl_tcp_event_base;
} \
} while (0)

#if MCA_BTL_TCP_USES_PROGRESS_THREAD
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
extern opal_list_t mca_btl_tcp_ready_frag_pending_queue;
extern opal_mutex_t mca_btl_tcp_ready_frag_mutex;
extern int mca_btl_tcp_pipe_to_progress[2];
extern int mca_btl_tcp_progress_thread_trigger;

#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) \
opal_mutex_atomic_lock((name))
Expand All @@ -109,9 +111,14 @@ extern int mca_btl_tcp_pipe_to_progress[2];
} while (0)
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
do { \
opal_event_t* _event = (opal_event_t*)(event); \
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
&_event); \
if(0 < mca_btl_tcp_progress_thread_trigger) { \
opal_event_t* _event = (opal_event_t*)(event); \
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
&_event); \
} \
else { \
opal_event_add(event, (value)); \
} \
} while (0)
#else
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name)
Expand All @@ -124,7 +131,7 @@ extern int mca_btl_tcp_pipe_to_progress[2];
do { \
opal_event_add(event, (value)); \
} while (0)
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
#endif /* MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD */

/**
* TCP BTL component.
Expand All @@ -143,6 +150,7 @@ struct mca_btl_tcp_component_t {
int tcp_endpoint_cache; /**< amount of cache on each endpoint */
opal_proc_table_t tcp_procs; /**< hash table of tcp proc structures */
opal_mutex_t tcp_lock; /**< lock for accessing module state */
opal_list_t tcp_events;

opal_event_t tcp_recv_event; /**< recv event for IPv4 listen socket */
int tcp_listen_sd; /**< IPv4 listen socket for incoming connection requests */
Expand All @@ -169,7 +177,9 @@ struct mca_btl_tcp_component_t {
opal_free_list_t tcp_frag_max;
opal_free_list_t tcp_frag_user;

#if MCA_BTL_TCP_USES_PROGRESS_THREAD
int tcp_enable_progress_thread; /** Support for tcp progress thread flag */

#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
opal_event_t tcp_recv_thread_async_event;
opal_mutex_t tcp_frag_eager_mutex;
opal_mutex_t tcp_frag_max_mutex;
Expand Down
Loading