Yet Another eXchange Tool 0.11.1
Loading...
Searching...
No Matches
xt_xmap_dist_dir_intercomm.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <stdbool.h>
51#include <stdlib.h>
52#include <stdio.h>
53#include <string.h>
54#include <assert.h>
55#include <limits.h>
56
57#include <mpi.h>
58
59#include "xt/xt_idxlist.h"
61#include "xt/xt_idxvec.h"
62#include "xt/xt_idxstripes.h"
63#include "xt/xt_idxempty.h"
64#include "xt/xt_xmap.h"
65#include "xt/xt_xmap_dist_dir.h"
67#include "xt/xt_mpi.h"
68#include "xt_arithmetic_util.h"
70#include "xt_mpi_internal.h"
71#include "core/core.h"
72#include "core/ppm_xfuncs.h"
74#include "xt_idxlist_internal.h"
76#include "xt_config_internal.h"
77#include "instr.h"
78#include "xt/xt_sort.h"
80
81enum {
85};
86
87static inline void
88rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
89{
90 send_size[rank][SEND_SIZE] = 0;
91 send_size[rank][SEND_NUM] = 0;
92}
93
95{
96 size_t num_msg;
97 void *buffer;
98};
99
100
101static struct mmsg_buf
103 int bucket_type,
104 Xt_idxlist idxlist,
105 int (*send_size)[SEND_SIZE_ASIZE],
106 MPI_Comm comm, int comm_size,
107 Xt_config config)
108{
109 int nosort_forced = XT_CONFIG_GET_FORCE_NOSORT(config);
110 size_t send_size_filled = (size_t)-1;
111 size_t max_num_intersect
112 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
113 bucket_gen_state, bucket_type);
114 struct mmsg_buf result = { 0, 0 };
115 if (max_num_intersect) {
116 Xt_idxlist idxlist_sorted
117 = nosort_forced || xt_idxlist_get_sorting(idxlist) == 1
118 ? idxlist
119 : xt_idxlist_sorted_copy_custom(idxlist, config);
120
121 size_t num_msg = 0;
122 size_t send_buffer_size = 0;
123 struct Xt_com_list *restrict sends
124 = xmalloc(max_num_intersect * sizeof(*sends));
125 struct Xt_com_list bucket;
126 while ((bucket = config->xmdd_bucket_gen->next(
127 bucket_gen_state, bucket_type)).list) {
128 size_t rank;
129 for (rank = send_size_filled + 1; rank < (size_t)bucket.rank; ++rank)
130 rank_no_send(rank, send_size);
131
132 Xt_idxlist isect2send
133 = xt_idxlist_get_intersection(idxlist_sorted, bucket.list);
134 if (xt_idxlist_get_num_indices(isect2send) > 0) {
135 sends[num_msg].list = isect2send;
136 sends[num_msg].rank = (int)rank;
137 send_buffer_size += xt_idxlist_get_pack_size(isect2send, comm);
138 /* send_size[rank][SEND_SIZE] is set below after the actual
139 * pack, because MPI_Pack_size only gives an upper bound,
140 * not the actually needed size */
141 send_size[rank][SEND_NUM] = 1;
142 ++num_msg;
143 } else {
144 rank_no_send(rank, send_size);
145 xt_idxlist_delete(isect2send);
146 }
147 }
148 if (idxlist_sorted != idxlist)
149 xt_idxlist_delete(idxlist_sorted);
150 for (size_t rank = send_size_filled+1; rank < (size_t)comm_size; ++rank)
151 rank_no_send(rank, send_size);
152
153 unsigned char *send_buffer = xmalloc(send_buffer_size);
154 size_t ofs = 0;
155 for (size_t i = 0; i < num_msg; ++i) {
156 int position = 0;
157 xt_idxlist_pack(sends[i].list, send_buffer + ofs,
158 (int)(send_buffer_size-ofs), &position, comm);
159 send_size[sends[i].rank][SEND_SIZE] = position;
160 ofs += (size_t)position;
161 xt_idxlist_delete(sends[i].list);
162 }
163
164 free(sends);
165 result.num_msg = num_msg;
166 result.buffer = send_buffer;
167 } else {
168 memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
169 result.num_msg = 0;
170 result.buffer = NULL;
171 }
172
173 return result;
174}
175
176
177static void
178compress_sizes(int (*restrict sizes)[SEND_SIZE_ASIZE], int comm_size,
179 struct Xt_xmdd_txstat *tx_stat, int *counts)
180{
181 size_t tx_num = 0, size_sum = 0;
182 for (size_t i = 0; i < (size_t)comm_size; ++i)
183 if (sizes[i][SEND_SIZE]) {
184 int tx_size = sizes[i][SEND_SIZE];
185 size_sum += (size_t)tx_size;
186 sizes[tx_num][SEND_SIZE] = tx_size;
187 if (counts) counts[tx_num] = sizes[i][SEND_NUM];
188 sizes[tx_num][SEND_NUM] = (int)i;
189 ++tx_num;
190 }
191 *tx_stat = (struct Xt_xmdd_txstat){ .bytes = size_sum, .num_msg = tx_num };
192}
193
194static void *
195create_intersections(void *bucket_gen_state,
196 int bucket_type,
197 struct Xt_xmdd_txstat tx_stat[2],
198 int recv_size[][SEND_SIZE_ASIZE],
199 int send_size[][SEND_SIZE_ASIZE],
200 Xt_idxlist idxlist,
201 MPI_Comm comm, int comm_size, Xt_config config)
202{
203 struct mmsg_buf ddr
205 bucket_gen_state, bucket_type, idxlist,
206 send_size, comm, comm_size, config);
207 xt_mpi_call(MPI_Alltoall((int *)send_size, SEND_SIZE_ASIZE, MPI_INT,
208 (int *)recv_size, SEND_SIZE_ASIZE, MPI_INT, comm),
209 comm);
210 compress_sizes(recv_size, comm_size, tx_stat + 0, NULL);
211 compress_sizes(send_size, comm_size, tx_stat + 1, NULL);
212 assert(ddr.num_msg == tx_stat[1].num_msg);
213 return ddr.buffer;
214}
215
216typedef int (*tx_fp)(void *, int, MPI_Datatype, int, int,
217 MPI_Comm, MPI_Request *);
218static void
220 const int (*sizes)[SEND_SIZE_ASIZE],
221 unsigned char *buffer, MPI_Request *requests,
222 int tag, MPI_Comm comm, tx_fp tx_op)
223{
224 size_t ofs = 0;
225 for (size_t i = 0; i < num_msg; ++i)
226 {
227 int rank = sizes[i][SEND_NUM], count = sizes[i][SEND_SIZE];
228 xt_mpi_call(tx_op(buffer + ofs,
229 count, MPI_PACKED, rank, tag, comm, requests + i), comm);
230 ofs += (size_t)count;
231 }
232}
233
234static void
236 const int (*recv_size)[SEND_SIZE_ASIZE],
237 void *recv_buffer, MPI_Request *requests,
238 int tag, MPI_Comm comm)
239{
240 tx_intersections(num_msg, recv_size, recv_buffer, requests, tag, comm,
241 (tx_fp)MPI_Irecv);
242}
243
244static void
246 const int (*send_size)[SEND_SIZE_ASIZE],
247 void *send_buffer, MPI_Request *requests,
248 int tag, MPI_Comm comm)
249{
250 tx_intersections(num_msg, send_size, send_buffer, requests, tag, comm,
251 (tx_fp)MPI_Isend);
252}
253
254
255static struct dist_dir *
257 const int (*sizes)[SEND_SIZE_ASIZE],
258 void *buffer,
259 MPI_Comm comm)
260{
261 size_t num_msg = tx_stat.num_msg, buf_size = tx_stat.bytes;
262 struct dist_dir *restrict dist_dir
263 = xmalloc(sizeof (*dist_dir) + sizeof (*dist_dir->entries) * num_msg);
264 dist_dir->num_entries = (int)num_msg;
265 int position = 0;
266 for (size_t i = 0; i < num_msg; ++i)
267 {
268 int rank = sizes[i][SEND_NUM];
269 dist_dir->entries[i].rank = rank;
271 = xt_idxlist_unpack(buffer, (int)buf_size, &position, comm);
272 }
273 return dist_dir;
274}
275
276struct dd_result {
278 int stripify;
279};
280
281/* unfortunately GCC 11 cannot handle the literal constants used for
282 * MPI_STATUSES_IGNORE by MPICH */
283#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
284#pragma GCC diagnostic push
285#pragma GCC diagnostic ignored "-Wstringop-overread"
286#pragma GCC diagnostic ignored "-Wstringop-overflow"
287#endif
288
289static struct dd_result
291 Xt_idxlist dst_idxlist,
292 int tag_offset_inter, int tag_offset_intra,
293 MPI_Comm inter_comm, MPI_Comm intra_comm,
294 int remote_size, int comm_size,
295 Xt_config config) {
296
297 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
298 bgd_size = (bgd_size + sizeof (void *) - 1)/sizeof (void *) * sizeof (void *);
299 void *bgd[bgd_size];
300 struct dd_result results;
301 results.stripify
302 = config->xmdd_bucket_gen->init(
303 &bgd, src_idxlist, dst_idxlist, intra_comm, tag_offset_intra,
304 inter_comm, tag_offset_inter, config, config->xmdd_bucket_gen);
305 int (*send_size_local)[SEND_SIZE_ASIZE]
306 = xmalloc(((size_t)comm_size + (size_t)remote_size)
307 * 2 * sizeof(*send_size_local)),
308 (*send_size_remote)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
309 (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_remote + remote_size,
310 (*recv_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size;
311 struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
312 void *send_buffer_local
314 tx_stat_local, recv_size_local,
315 send_size_local, src_idxlist,
316 intra_comm, comm_size, config);
317 void *send_buffer_remote
319 tx_stat_remote, recv_size_remote,
320 send_size_remote, dst_idxlist,
321 inter_comm, remote_size, config);
322 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
323 size_t num_req = tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg
324 + tx_stat_local[1].num_msg + tx_stat_remote[1].num_msg;
325 MPI_Request *dir_init_requests
326 = xmalloc(num_req * sizeof(*dir_init_requests)
327 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
328 void *recv_buffer_local = dir_init_requests + num_req,
329 *recv_buffer_remote = ((unsigned char *)recv_buffer_local
330 + tx_stat_local[0].bytes);
331 int tag_intra = tag_offset_intra + xt_mpi_tag_xmap_dist_dir_src_send;
332 size_t req_ofs = tx_stat_local[0].num_msg;
333 irecv_intersections(tx_stat_local[0].num_msg,
334 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
335 recv_buffer_local, dir_init_requests,
336 tag_intra, intra_comm);
337 int tag_inter = tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
338 irecv_intersections(tx_stat_remote[0].num_msg,
339 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
340 recv_buffer_remote, dir_init_requests + req_ofs,
341 tag_inter, inter_comm);
342 req_ofs += tx_stat_remote[0].num_msg;
343 isend_intersections(tx_stat_local[1].num_msg,
344 (const int (*)[SEND_SIZE_ASIZE])send_size_local,
345 send_buffer_local, dir_init_requests + req_ofs,
346 tag_intra, intra_comm);
347 req_ofs += tx_stat_local[1].num_msg;
348 isend_intersections(tx_stat_remote[1].num_msg,
349 (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
350 send_buffer_remote, dir_init_requests + req_ofs,
351 tag_inter, inter_comm);
352 // wait for data transfers to complete
353 xt_mpi_call(MPI_Waitall((int)num_req, dir_init_requests,
354 MPI_STATUSES_IGNORE), inter_comm);
355 free(send_buffer_local);
356 free(send_buffer_remote);
357 results.dist_dirs.src
358 = unpack_dist_dir(tx_stat_local[0],
359 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
360 recv_buffer_local, intra_comm);
361 results.dist_dirs.dst
362 = unpack_dist_dir(tx_stat_remote[0],
363 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
364 recv_buffer_remote, inter_comm);
365 free(send_size_local);
366 free(dir_init_requests);
367 return results;
368}
369
370
371static size_t
372send_size_from_intersections(size_t num_intersections,
373 const struct isect *restrict src_dst_intersections,
374 enum xt_xmdd_direction target,
375 MPI_Comm comm, int comm_size,
376 int (*restrict send_size_target)[SEND_SIZE_ASIZE])
377{
378 size_t total_send_size = 0;
379 for (int i = 0; i < comm_size; ++i)
380 (void)(send_size_target[i][SEND_SIZE] = 0),
381 (void)(send_size_target[i][SEND_NUM] = 0);
382
383 int rank_pack_size;
384 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
385
386 for (size_t i = 0; i < num_intersections; ++i)
387 {
388 size_t msg_size = (size_t)rank_pack_size
389 + xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist, comm);
390 size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
391 /* send_size_target[target_rank][SEND_SIZE] += msg_size; */
392 ++(send_size_target[target_rank][SEND_NUM]);
393 total_send_size += msg_size;
394 }
395 assert(total_send_size <= INT_MAX);
396 return total_send_size;
397}
398
399static struct mmsg_buf
400pack_dist_dirs(size_t num_intersections,
401 struct isect *restrict src_dst_intersections,
402 int (*send_size)[SEND_SIZE_ASIZE],
403 enum xt_xmdd_direction target,
404 bool isect_idxlist_delete, MPI_Comm comm, int comm_size) {
405
406 size_t total_send_size
407 = send_size_from_intersections(num_intersections,
408 src_dst_intersections,
409 target,
410 comm, comm_size, send_size);
411
412 unsigned char *send_buffer = xmalloc(total_send_size);
413 qsort(src_dst_intersections, num_intersections,
414 sizeof (src_dst_intersections[0]),
415 target == xt_xmdd_direction_src
417 size_t ofs = 0;
418 size_t num_requests
420 target, num_intersections, src_dst_intersections,
421 isect_idxlist_delete,
422 SEND_SIZE_ASIZE, SEND_SIZE, send_size,
423 send_buffer, total_send_size, &ofs, comm);
424 return (struct mmsg_buf){ .num_msg = num_requests,
425 .buffer = send_buffer };
426}
427
428static struct dist_dir *
430 void *restrict recv_buffer,
431 int *restrict entry_counts,
432 MPI_Comm comm)
433{
434 size_t num_msg = tx_stat.num_msg;
435 int buf_size = (int)tx_stat.bytes;
436 int position = 0;
437 size_t num_entries_sent = 0;
438 for (size_t i = 0; i < num_msg; ++i)
439 num_entries_sent += (size_t)entry_counts[i];
440 struct dist_dir *dist_dir
441 = xmalloc(sizeof (struct dist_dir)
442 + (sizeof (struct Xt_com_list) * num_entries_sent));
443 dist_dir->num_entries = (int)num_entries_sent;
444 struct Xt_com_list *restrict entries = dist_dir->entries;
445 size_t num_entries = 0;
446 for (size_t i = 0; i < num_msg; ++i) {
447 size_t num_entries_from_rank = (size_t)entry_counts[i];
448 for (size_t j = 0; j < num_entries_from_rank; ++j) {
449 xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
450 &entries[num_entries].rank,
451 1, MPI_INT, comm), comm);
452 entries[num_entries].list =
453 xt_idxlist_unpack(recv_buffer, buf_size, &position, comm);
454 ++num_entries;
455 }
456 }
457 assert(num_entries == num_entries_sent);
458 qsort(entries, num_entries_sent, sizeof(*entries), xt_com_list_rank_cmp);
460 return dist_dir;
461}
462
463
464static struct dd_result
466 Xt_idxlist dst_idxlist,
467 int tag_offset_inter, int tag_offset_intra,
468 MPI_Comm inter_comm, MPI_Comm intra_comm,
469 Xt_config config) {
470
471 int comm_size, remote_size;
472 xt_mpi_call(MPI_Comm_size(inter_comm, &comm_size), inter_comm);
473 xt_mpi_call(MPI_Comm_remote_size(inter_comm, &remote_size), inter_comm);
474
475 struct dd_result bucket_isects
476 = generate_distributed_directories(src_idxlist, dst_idxlist,
477 tag_offset_inter, tag_offset_intra,
478 inter_comm, intra_comm,
479 remote_size, comm_size,
480 config);
481
482
483 int (*send_size_local)[SEND_SIZE_ASIZE]
484 = xmalloc(((size_t)comm_size + (size_t)remote_size)
485 * 2U * sizeof(*send_size_local)),
486 (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
487 (*send_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size,
488 (*recv_size_remote)[SEND_SIZE_ASIZE] = send_size_remote + remote_size;
489
490 /* match the source and destination entries in the local distributed
491 * directories... */
492 struct isect *src_dst_intersections;
493 size_t num_intersections
495 bucket_isects.dist_dirs.dst,
496 &src_dst_intersections, config);
497 xt_xmdd_free_dist_dirs(bucket_isects.dist_dirs);
498 /* ... and pack the results into a sendable format */
499 struct mmsg_buf dd_local, dd_remote;
500 dd_local
501 = pack_dist_dirs(num_intersections, src_dst_intersections, send_size_local,
502 xt_xmdd_direction_src, false, intra_comm, comm_size);
503 dd_remote
504 = pack_dist_dirs(num_intersections, src_dst_intersections, send_size_remote,
505 xt_xmdd_direction_dst, true, inter_comm, remote_size);
506 free(src_dst_intersections);
507
508 // get the data size the local process will receive from other processes
509 xt_mpi_call(MPI_Alltoall((int *)send_size_local, SEND_SIZE_ASIZE, MPI_INT,
510 (int *)recv_size_local, SEND_SIZE_ASIZE, MPI_INT,
511 intra_comm), intra_comm);
512 xt_mpi_call(MPI_Alltoall((int *)send_size_remote, SEND_SIZE_ASIZE, MPI_INT,
513 (int *)recv_size_remote, SEND_SIZE_ASIZE, MPI_INT,
514 inter_comm), inter_comm);
515
516 struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
517 int *isect_counts_recv_local
518 = xmalloc(((size_t)comm_size + (size_t)remote_size) * sizeof (int)),
519 *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
520 compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
521 compress_sizes(recv_size_local, comm_size, tx_stat_local+0,
522 isect_counts_recv_local);
523 compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
524 compress_sizes(recv_size_remote, remote_size, tx_stat_remote+0,
525 isect_counts_recv_remote);
526 assert(tx_stat_local[1].num_msg == dd_local.num_msg
527 && tx_stat_remote[1].num_msg == dd_remote.num_msg);
528 size_t num_requests
529 = dd_local.num_msg + dd_remote.num_msg
530 + tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg;
531 assert(num_requests <= INT_MAX);
532 MPI_Request *requests
533 = xmalloc(num_requests * sizeof(*requests)
534 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
535 void *recv_buf_local = requests + num_requests,
536 *recv_buf_remote = (unsigned char *)recv_buf_local + tx_stat_local[0].bytes;
537 size_t req_ofs = tx_stat_local[0].num_msg;
538 int tag_intra = tag_offset_intra + xt_mpi_tag_xmap_dist_dir_src_send;
539 irecv_intersections(tx_stat_local[0].num_msg,
540 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
541 recv_buf_local, requests, tag_intra, intra_comm);
542 int tag_inter = tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
543 irecv_intersections(tx_stat_remote[0].num_msg,
544 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
545 recv_buf_remote, requests+req_ofs, tag_inter, inter_comm);
546 req_ofs += tx_stat_remote[0].num_msg;
547 isend_intersections(tx_stat_local[1].num_msg,
548 (const int (*)[SEND_SIZE_ASIZE])send_size_local,
549 dd_local.buffer, requests+req_ofs, tag_intra,
550 intra_comm);
551 req_ofs += tx_stat_local[1].num_msg;
552 isend_intersections(tx_stat_remote[1].num_msg,
553 (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
554 dd_remote.buffer, requests+req_ofs, tag_inter,
555 inter_comm);
556 xt_mpi_call(MPI_Waitall((int)num_requests, requests, MPI_STATUSES_IGNORE),
557 inter_comm);
558 free(dd_local.buffer);
559 free(dd_remote.buffer);
560 free(send_size_local);
561
562 struct dd_result results;
563 results.stripify = bucket_isects.stripify;
564 results.dist_dirs.src
565 = unpack_dist_dir_results(tx_stat_local[0], recv_buf_local,
566 isect_counts_recv_local, intra_comm);
567 results.dist_dirs.dst
568 = unpack_dist_dir_results(tx_stat_remote[0], recv_buf_remote,
569 isect_counts_recv_remote, inter_comm);
570 free(requests);
571 free(isect_counts_recv_local);
572 return results;
573}
574
575
576
579 Xt_idxlist dst_idxlist,
580 MPI_Comm inter_comm_,
581 MPI_Comm intra_comm_,
582 Xt_config config)
583{
584 INSTR_DEF(this_instr,"xt_xmap_dist_dir_intercomm_new")
585 INSTR_START(this_instr);
586
587 // ensure that yaxt is initialized
588 assert(xt_initialized());
589
590 int tag_offset_inter, tag_offset_intra;
591 MPI_Comm inter_comm = xt_mpi_comm_smart_dup(inter_comm_, &tag_offset_inter),
592 intra_comm = xt_mpi_comm_smart_dup(intra_comm_, &tag_offset_intra);
593
594 struct dd_result results
595 = exchange_idxlists(src_idxlist, dst_idxlist,
596 tag_offset_inter, tag_offset_intra,
597 inter_comm, intra_comm, config);
598
600 if (stripify == 2)
601 stripify = results.stripify;
602 Xt_xmap (*xmap_new)(int num_src_intersections,
603 const struct Xt_com_list *src_com,
604 int num_dst_intersections,
605 const struct Xt_com_list *dst_com,
606 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
607 MPI_Comm comm)
609
610 Xt_xmap xmap
611 = xmap_new(results.dist_dirs.src->num_entries,
612 results.dist_dirs.src->entries,
613 results.dist_dirs.dst->num_entries,
614 results.dist_dirs.dst->entries,
615 src_idxlist, dst_idxlist, inter_comm);
616
617 xt_mpi_comm_smart_dedup(&inter_comm, tag_offset_inter);
618 xt_mpi_comm_smart_dedup(&intra_comm, tag_offset_intra);
619
621 INSTR_STOP(this_instr);
622 return xmap;
623}
624
627 MPI_Comm inter_comm, MPI_Comm intra_comm)
628{
630 src_idxlist, dst_idxlist, inter_comm, intra_comm, &xt_default_config);
631}
632
633/*
634 * Local Variables:
635 * c-basic-offset: 2
636 * coding: utf-8
637 * indent-tabs-mode: nil
638 * show-trailing-whitespace: t
639 * require-trailing-newline: t
640 * End:
641 */
int MPI_Comm
Definition core.h:64
#define INSTR_STOP(T)
Definition instr.h:69
#define INSTR_DEF(T, S)
Definition instr.h:66
#define INSTR_START(T)
Definition instr.h:68
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
Xt_idxlist list
Definition xt_core.h:154
struct dist_dir_pair dist_dirs
struct dist_dir * dst
struct dist_dir * src
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
Definition xt_config.c:203
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_DIST_DIR_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
int xt_initialized(void)
struct Xt_xmap_ * Xt_xmap
Definition xt_core.h:85
index list declaration
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
Definition xt_idxlist.c:104
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition xt_idxlist.c:86
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition xt_idxlist.c:80
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
Definition xt_idxlist.c:359
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition xt_idxlist.c:75
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition xt_mpi.c:333
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition xt_mpi.c:386
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
@ Xt_dist_dir_bucket_gen_type_send
@ Xt_dist_dir_bucket_gen_type_recv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Utility functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
static void * create_intersections(void *bucket_gen_state, int bucket_type, struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, MPI_Comm comm, int comm_size, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, Xt_config config)
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static struct dist_dir * unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
static struct dist_dir * unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, MPI_Comm comm)
static struct mmsg_buf pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)
Xt_xmap xt_xmap_dist_dir_intercomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)