80static const char filename[] =
"xt_xmap_dist_dir.c";
84#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
85#pragma GCC diagnostic push
86#pragma GCC diagnostic ignored "-Wstringop-overread"
87#pragma GCC diagnostic ignored "-Wstringop-overflow"
122 bgd_size = (bgd_size +
sizeof (
void *) - 1)/
sizeof (
void *) *
sizeof (
void *);
126 &bgd, src_idxlist, dst_idxlist, comm, tag_offset,
140 size_t max_num_intersect
143 size_t send_size_filled = (size_t)-1;
144 if (max_num_intersect) {
145 size_t send_buffer_size = 0;
147 =
xmalloc(2 * max_num_intersect *
sizeof(*sends_dst)),
148 *restrict sends_src = sends_dst + max_num_intersect;
149 size_t num_src_msg = 0, num_dst_msg = 0;
157 src_idxlist_sorted, bucket.
list, config);
159 sends_src[num_src_msg].list = send4src;
160 sends_src[num_src_msg].rank = (int)
rank;
171 bucket.
list, dst_idxlist_sorted, config);
173 sends_dst[num_dst_msg].list = send4dst;
174 sends_dst[num_dst_msg].rank = (int)
rank;
183 send_size_filled =
rank;
186 for (
size_t rank = send_size_filled+1;
rank < (size_t)comm_size; ++
rank)
188 unsigned char *send_buffer
189 = *send_buffer_ =
xmalloc(send_buffer_size);
191 for (
size_t i = 0; i < num_src_msg; ++i) {
194 (
int)(send_buffer_size-ofs), &position, comm);
196 ofs += (size_t)position;
200 for (
size_t i = 0; i < num_dst_msg; ++i) {
203 (
int)(send_buffer_size-ofs), &position, comm);
205 ofs += (size_t)position;
209 num_msg = (int)(num_src_msg + num_dst_msg);
212 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
215 if (dst_idxlist_sorted != dst_idxlist)
217 if (src_idxlist_sorted != src_idxlist)
220 return (
struct capbi_result){ .stripify = stripify, .num_msg = num_msg };
225 int recv_count,
void * recv_buffer,
int tag,
229 int total_recv_size = 0;
231 for (
int i = 0; i < recv_count; ++i)
235 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
236 tag, comm, &status), comm);
239 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
247 total_recv_size += received_count;
250 if (total_recv_size != recv_size)
251 Xt_abort(comm,
"ERROR: recv_intersections received wrong number of bytes",
258 MPI_Request *dir_init_send_requests,
259 int tag_offset,
MPI_Comm comm,
int comm_size) {
264 src_tag, comm, comm_size,
265 dir_init_send_requests,
271 dst_tag, comm, comm_size,
272 dir_init_send_requests + txstat.
num_msg,
280 struct dist_dir *src_dist_dir, *dst_dist_dir;
302 .dst = dst_dist_dir };
308 const struct isect *restrict src_dst_intersections,
312 size_t total_send_size = 0;
313 for (
int i = 0; i < comm_size; ++i)
320 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
322 for (
size_t i = 0; i < num_intersections; ++i)
324 int msg_size = rank_pack_size
335 total_send_size += 2*(size_t)msg_size;
337 assert(total_send_size <= INT_MAX);
338 return total_send_size;
344 struct isect *restrict src_dst_intersections,
349 size_t total_send_size
351 src_dst_intersections,
352 comm, comm_size, send_size);
354 unsigned char *send_buffer = (*send_buffer_)
355 =
xmalloc((
size_t)total_send_size);
357 if (num_intersections > 1)
358 qsort(src_dst_intersections, num_intersections,
360 size_t num_send_indices_requests
364 send_buffer, total_send_size, &ofs, comm);
366 if (num_intersections > 1)
367 qsort(src_dst_intersections, num_intersections,
369 num_send_indices_requests
373 send_buffer, total_send_size, &ofs, comm);
374 assert(num_send_indices_requests <= INT_MAX);
375 return (
int)num_send_indices_requests;
390 int recv_size[num_sizes],
391 int (*send_size)[num_sizes],
394#if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
395 xt_mpi_call(MPI_Reduce_scatter_block((
int *)send_size, (
int *)recv_size,
396 num_sizes, MPI_INT, MPI_SUM,
400 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
402 int *recv_count =
xmalloc((
size_t)comm_size *
sizeof(*recv_count));
403 for (
int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
405 xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
406 MPI_SUM, comm), comm);
424 void *send_buffer = NULL;
427 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
431 send_size, &send_buffer,
432 comm, tag_offset, comm_size,
440 MPI_Request *dir_init_send_requests
443 dir_init_send_requests, tag_offset, comm, comm_size);
452 MPI_STATUSES_IGNORE), comm);
453 free(dir_init_send_requests);
462 void *restrict recv_buffer,
int tag,
468 while (recv_size > 0) {
472 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
473 MPI_ANY_SOURCE, tag, comm, &status), comm);
476 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
478 recv_size -= received_count;
482 while (received_count > position) {
484 xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
486 1, MPI_INT, comm), comm);
498 Xt_abort(comm,
"ERROR: recv_and_unpack_dist_dir_result"
499 " received wrong number of bytes",
filename, __LINE__);
508 int *num_send_indices_requests,
509 MPI_Request *send_indices_requests,
528 recv_buffer, tag_offset
532 enum { ops_completed_auto_size = 16 };
533 int ops_completed_auto[ops_completed_auto_size];
535 = *num_send_indices_requests > ops_completed_auto_size
536 ?
xmalloc((
size_t)*num_send_indices_requests *
sizeof (*ops_completed))
537 : ops_completed_auto;
540 ops_completed, comm);
544 recv_buffer, tag_offset
551 ops_completed, comm);
559 ops_completed, comm);
562 if (ops_completed != ops_completed_auto) free(ops_completed);
563 return dist_dir_results;
576 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
580 tag_offset, comm, comm_size, config);
585 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
589 struct isect *src_dst_intersections;
590 size_t num_intersections
592 &src_dst_intersections, config);
595 int num_send_indices_requests
597 send_size, &send_buffer, comm, comm_size);
598 free(src_dst_intersections);
604 MPI_Request *send_indices_requests
605 =
xmalloc((
size_t)num_send_indices_requests
606 *
sizeof(*send_indices_requests));
609 send_indices_requests, tag_offset, comm, comm_size);
613 &num_send_indices_requests,
614 send_indices_requests,
617 xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
618 MPI_STATUSES_IGNORE), comm);
622 free(send_indices_requests);
624 .dist_dirs.src = ddp.
src,
633 INSTR_DEF(this_instr,
"xt_xmap_all2all_new")
648 Xt_xmap (*xmap_new)(
int num_src_intersections,
650 int num_dst_intersections,
662 src_idxlist, dst_idxlist, newcomm, config);
add versions of standard API functions not returning on error
const struct Xt_xmdd_bucket_gen_ * xmdd_bucket_gen
Xt_xmdd_bucket_gen_next next
Xt_xmdd_bucket_gen_init_state_internal init
Xt_xmdd_bucket_gen_get_intersect_max_num get_intersect_max_num
struct dist_dir_pair dist_dirs
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_DIST_DIR_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
struct Xt_xmap_ * Xt_xmap
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection_custom(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst, Xt_config config)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static const char filename[]
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct dist_dir_pair recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], int tag_offset, MPI_Comm comm)
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_intracomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size, Xt_config config)
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct capbi_result compute_and_pack_bucket_intersections(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int tag_offset, int comm_size, Xt_config config)
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static struct dist_dir_pair recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
@ Xt_dist_dir_bucket_gen_type_sendrecv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int send_size[rank_lim][send_size_asize])
Utility functions for creation of distributed directories.
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, Xt_config config)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_intersection_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)