110 size_t send_size_filled = (size_t)-1;
111 size_t max_num_intersect
112 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
113 bucket_gen_state, bucket_type);
115 if (max_num_intersect) {
122 size_t send_buffer_size = 0;
124 =
xmalloc(max_num_intersect *
sizeof(*sends));
126 while ((bucket = config->xmdd_bucket_gen->next(
127 bucket_gen_state, bucket_type)).list) {
135 sends[num_msg].list = isect2send;
136 sends[num_msg].rank = (int)
rank;
148 if (idxlist_sorted != idxlist)
150 for (
size_t rank = send_size_filled+1;
rank < (size_t)comm_size; ++
rank)
153 unsigned char *send_buffer =
xmalloc(send_buffer_size);
155 for (
size_t i = 0; i < num_msg; ++i) {
158 (
int)(send_buffer_size-ofs), &position, comm);
159 send_size[sends[i].rank][
SEND_SIZE] = position;
160 ofs += (size_t)position;
166 result.
buffer = send_buffer;
168 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
181 size_t tx_num = 0, size_sum = 0;
182 for (
size_t i = 0; i < (size_t)comm_size; ++i)
185 size_sum += (size_t)tx_size;
187 if (counts) counts[tx_num] = sizes[i][
SEND_NUM];
205 bucket_gen_state, bucket_type, idxlist,
206 send_size, comm, comm_size, config);
216typedef int (*
tx_fp)(
void *, int, MPI_Datatype, int, int,
221 unsigned char *
buffer, MPI_Request *requests,
225 for (
size_t i = 0; i <
num_msg; ++i)
229 count, MPI_PACKED, rank, tag, comm, requests + i), comm);
230 ofs += (size_t)count;
237 void *recv_buffer, MPI_Request *requests,
247 void *send_buffer, MPI_Request *requests,
261 size_t num_msg = tx_stat.
num_msg, buf_size = tx_stat.
bytes;
266 for (
size_t i = 0; i < num_msg; ++i)
283#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
284#pragma GCC diagnostic push
285#pragma GCC diagnostic ignored "-Wstringop-overread"
286#pragma GCC diagnostic ignored "-Wstringop-overflow"
292 int tag_offset_inter, int tag_offset_intra,
294 int remote_size, int comm_size,
297 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
298 bgd_size = (bgd_size +
sizeof (
void *) - 1)/
sizeof (
void *) *
sizeof (
void *);
302 = config->xmdd_bucket_gen->init(
303 &bgd, src_idxlist, dst_idxlist, intra_comm, tag_offset_intra,
304 inter_comm, tag_offset_inter, config, config->xmdd_bucket_gen);
306 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
307 * 2 *
sizeof(*send_size_local)),
312 void *send_buffer_local
314 tx_stat_local, recv_size_local,
315 send_size_local, src_idxlist,
316 intra_comm, comm_size, config);
317 void *send_buffer_remote
319 tx_stat_remote, recv_size_remote,
320 send_size_remote, dst_idxlist,
321 inter_comm, remote_size, config);
323 size_t num_req = tx_stat_local[0].
num_msg + tx_stat_remote[0].
num_msg
325 MPI_Request *dir_init_requests
326 =
xmalloc(num_req *
sizeof(*dir_init_requests)
327 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
328 void *recv_buffer_local = dir_init_requests + num_req,
329 *recv_buffer_remote = ((
unsigned char *)recv_buffer_local
330 + tx_stat_local[0].bytes);
332 size_t req_ofs = tx_stat_local[0].
num_msg;
335 recv_buffer_local, dir_init_requests,
336 tag_intra, intra_comm);
340 recv_buffer_remote, dir_init_requests + req_ofs,
341 tag_inter, inter_comm);
342 req_ofs += tx_stat_remote[0].
num_msg;
345 send_buffer_local, dir_init_requests + req_ofs,
346 tag_intra, intra_comm);
347 req_ofs += tx_stat_local[1].
num_msg;
350 send_buffer_remote, dir_init_requests + req_ofs,
351 tag_inter, inter_comm);
353 xt_mpi_call(MPI_Waitall((
int)num_req, dir_init_requests,
354 MPI_STATUSES_IGNORE), inter_comm);
355 free(send_buffer_local);
356 free(send_buffer_remote);
360 recv_buffer_local, intra_comm);
364 recv_buffer_remote, inter_comm);
365 free(send_size_local);
366 free(dir_init_requests);
373 const struct isect *restrict src_dst_intersections,
378 size_t total_send_size = 0;
379 for (
int i = 0; i < comm_size; ++i)
380 (
void)(send_size_target[i][
SEND_SIZE] = 0),
381 (
void)(send_size_target[i][
SEND_NUM] = 0);
384 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
386 for (
size_t i = 0; i < num_intersections; ++i)
388 size_t msg_size = (size_t)rank_pack_size
390 size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
392 ++(send_size_target[target_rank][
SEND_NUM]);
393 total_send_size += msg_size;
395 assert(total_send_size <= INT_MAX);
396 return total_send_size;
401 struct
isect *restrict src_dst_intersections,
404 bool isect_idxlist_delete,
MPI_Comm comm,
int comm_size) {
406 size_t total_send_size
408 src_dst_intersections,
410 comm, comm_size, send_size);
412 unsigned char *send_buffer =
xmalloc(total_send_size);
413 qsort(src_dst_intersections, num_intersections,
414 sizeof (src_dst_intersections[0]),
420 target, num_intersections, src_dst_intersections,
421 isect_idxlist_delete,
423 send_buffer, total_send_size, &ofs, comm);
424 return (
struct mmsg_buf){ .num_msg = num_requests,
425 .buffer = send_buffer };
430 void *restrict recv_buffer,
431 int *restrict entry_counts,
434 size_t num_msg = tx_stat.
num_msg;
435 int buf_size = (int)tx_stat.
bytes;
437 size_t num_entries_sent = 0;
438 for (
size_t i = 0; i < num_msg; ++i)
439 num_entries_sent += (
size_t)entry_counts[i];
442 + (
sizeof (
struct Xt_com_list) * num_entries_sent));
445 size_t num_entries = 0;
446 for (
size_t i = 0; i < num_msg; ++i) {
447 size_t num_entries_from_rank = (size_t)entry_counts[i];
448 for (
size_t j = 0; j < num_entries_from_rank; ++j) {
449 xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
450 &entries[num_entries].
rank,
451 1, MPI_INT, comm), comm);
452 entries[num_entries].list =
457 assert(num_entries == num_entries_sent);
467 int tag_offset_inter, int tag_offset_intra,
471 int comm_size, remote_size;
472 xt_mpi_call(MPI_Comm_size(inter_comm, &comm_size), inter_comm);
473 xt_mpi_call(MPI_Comm_remote_size(inter_comm, &remote_size), inter_comm);
477 tag_offset_inter, tag_offset_intra,
478 inter_comm, intra_comm,
479 remote_size, comm_size,
484 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
485 * 2U *
sizeof(*send_size_local)),
492 struct isect *src_dst_intersections;
493 size_t num_intersections
496 &src_dst_intersections, config);
499 struct mmsg_buf dd_local, dd_remote;
501 =
pack_dist_dirs(num_intersections, src_dst_intersections, send_size_local,
504 =
pack_dist_dirs(num_intersections, src_dst_intersections, send_size_remote,
506 free(src_dst_intersections);
511 intra_comm), intra_comm);
514 inter_comm), inter_comm);
517 int *isect_counts_recv_local
518 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size) *
sizeof (
int)),
519 *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
520 compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
522 isect_counts_recv_local);
523 compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
525 isect_counts_recv_remote);
526 assert(tx_stat_local[1].num_msg == dd_local.
num_msg
531 assert(num_requests <= INT_MAX);
532 MPI_Request *requests
533 =
xmalloc(num_requests *
sizeof(*requests)
534 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
535 void *recv_buf_local = requests + num_requests,
536 *recv_buf_remote = (
unsigned char *)recv_buf_local + tx_stat_local[0].bytes;
537 size_t req_ofs = tx_stat_local[0].
num_msg;
541 recv_buf_local, requests, tag_intra, intra_comm);
545 recv_buf_remote, requests+req_ofs, tag_inter, inter_comm);
546 req_ofs += tx_stat_remote[0].
num_msg;
549 dd_local.
buffer, requests+req_ofs, tag_intra,
551 req_ofs += tx_stat_local[1].
num_msg;
554 dd_remote.
buffer, requests+req_ofs, tag_inter,
556 xt_mpi_call(MPI_Waitall((
int)num_requests, requests, MPI_STATUSES_IGNORE),
560 free(send_size_local);
566 isect_counts_recv_local, intra_comm);
569 isect_counts_recv_remote, inter_comm);
571 free(isect_counts_recv_local);
584 INSTR_DEF(this_instr,
"xt_xmap_dist_dir_intercomm_new")
590 int tag_offset_inter, tag_offset_intra;
596 tag_offset_inter, tag_offset_intra,
597 inter_comm, intra_comm, config);
602 Xt_xmap (*xmap_new)(
int num_src_intersections,
604 int num_dst_intersections,
615 src_idxlist, dst_idxlist, inter_comm);
add versions of standard API functions not returning on error
struct dist_dir_pair dist_dirs
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_DIST_DIR_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
struct Xt_xmap_ * Xt_xmap
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
@ Xt_dist_dir_bucket_gen_type_send
@ Xt_dist_dir_bucket_gen_type_recv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Utility functions for creation of distributed directories.
static void * create_intersections(void *bucket_gen_state, int bucket_type, struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, MPI_Comm comm, int comm_size, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, Xt_config config)
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static struct dist_dir * unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
static struct dist_dir * unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, MPI_Comm comm)
static struct mmsg_buf pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)
Xt_xmap xt_xmap_dist_dir_intercomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)