Yet Another eXchange Tool 0.11.1
Loading...
Searching...
No Matches
xt_xmap_dist_dir.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <stdbool.h>
51#include <stdlib.h>
52#include <stdio.h>
53#include <string.h>
54#include <assert.h>
55#include <limits.h>
56
57#include <mpi.h>
58
59#include "xt/xt_idxlist.h"
60#include "xt_idxlist_internal.h"
62#include "xt/xt_idxvec.h"
63#include "xt/xt_idxstripes.h"
64#include "xt/xt_idxempty.h"
65#include "xt/xt_xmap.h"
66#include "xt/xt_xmap_dist_dir.h"
67#include "xt/xt_mpi.h"
68#include "xt_mpi_internal.h"
69#include "core/core.h"
70#include "core/ppm_xfuncs.h"
72#include "xt_config_internal.h"
73#include "instr.h"
75#include "xt/xt_sort.h"
77
78
79
80static const char filename[] = "xt_xmap_dist_dir.c";
81
82/* unfortunately GCC 11 cannot handle the literal constants used for
83 * MPI_STATUSES_IGNORE by MPICH */
84#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
85#pragma GCC diagnostic push
86#pragma GCC diagnostic ignored "-Wstringop-overread"
87#pragma GCC diagnostic ignored "-Wstringop-overflow"
88#endif
89
90enum {
96};
97
98static inline void
99rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
100{
101 send_size[rank][SEND_SIZE_SRC] = 0;
102 send_size[rank][SEND_NUM_SRC] = 0;
103 send_size[rank][SEND_SIZE_DST] = 0;
104 send_size[rank][SEND_NUM_DST] = 0;
105}
106
107
110};
111
112static struct capbi_result
114 Xt_idxlist dst_idxlist,
115 int (*send_size)[SEND_SIZE_ASIZE],
116 void **send_buffer_,
117 MPI_Comm comm, int tag_offset,
118 int comm_size,
119 Xt_config config)
120{
121 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
122 bgd_size = (bgd_size + sizeof (void *) - 1)/sizeof (void *) * sizeof (void *);
123 void *bgd[bgd_size];
124 int stripify =
125 config->xmdd_bucket_gen->init(
126 &bgd, src_idxlist, dst_idxlist, comm, tag_offset,
127 MPI_COMM_NULL, INT_MIN, config, config->xmdd_bucket_gen);
128
129 int nosort_forced = XT_CONFIG_GET_FORCE_NOSORT(config);
130 Xt_idxlist src_idxlist_sorted
131 = nosort_forced || xt_idxlist_get_sorting(src_idxlist) == 1
132 ? src_idxlist
133 : xt_idxlist_sorted_copy_custom(src_idxlist, config);
134 Xt_idxlist dst_idxlist_sorted
135 = nosort_forced || xt_idxlist_get_sorting(dst_idxlist) == 1
136 ? dst_idxlist
137 : xt_idxlist_sorted_copy_custom(dst_idxlist, config);
138
139 int num_msg = 0;
140 size_t max_num_intersect
141 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
143 size_t send_size_filled = (size_t)-1;
144 if (max_num_intersect) {
145 size_t send_buffer_size = 0;
146 struct Xt_com_list *restrict sends_dst
147 = xmalloc(2 * max_num_intersect * sizeof(*sends_dst)),
148 *restrict sends_src = sends_dst + max_num_intersect;
149 size_t num_src_msg = 0, num_dst_msg = 0;
150 struct Xt_com_list bucket;
151 while ((bucket = config->xmdd_bucket_gen->next(
153 size_t rank;
154 for (rank = send_size_filled + 1; rank < (size_t)bucket.rank; ++rank)
155 rank_no_send(rank, send_size);
157 src_idxlist_sorted, bucket.list, config);
158 if (xt_idxlist_get_num_indices(send4src) > 0) {
159 sends_src[num_src_msg].list = send4src;
160 sends_src[num_src_msg].rank = (int)rank;
161 send_buffer_size += xt_idxlist_get_pack_size(send4src, comm);
162 send_size[rank][SEND_NUM_SRC] = 1;
163 ++num_src_msg;
164 } else {
165 send_size[rank][SEND_SIZE_SRC] = 0;
166 send_size[rank][SEND_NUM_SRC] = 0;
167 xt_idxlist_delete(send4src);
168 }
169
171 bucket.list, dst_idxlist_sorted, config);
172 if (xt_idxlist_get_num_indices(send4dst) > 0) {
173 sends_dst[num_dst_msg].list = send4dst;
174 sends_dst[num_dst_msg].rank = (int)rank;
175 send_buffer_size += xt_idxlist_get_pack_size(send4dst, comm);
176 send_size[rank][SEND_NUM_DST] = 1;
177 ++num_dst_msg;
178 } else {
179 send_size[rank][SEND_SIZE_DST] = 0;
180 send_size[rank][SEND_NUM_DST] = 0;
181 xt_idxlist_delete(send4dst);
182 }
183 send_size_filled = rank;
184 }
185 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
186 for (size_t rank = send_size_filled+1; rank < (size_t)comm_size; ++rank)
187 rank_no_send(rank, send_size);
188 unsigned char *send_buffer
189 = *send_buffer_ = xmalloc(send_buffer_size);
190 size_t ofs = 0;
191 for (size_t i = 0; i < num_src_msg; ++i) {
192 int position = 0;
193 xt_idxlist_pack(sends_src[i].list, send_buffer+ofs,
194 (int)(send_buffer_size-ofs), &position, comm);
195 send_size[sends_src[i].rank][SEND_SIZE_SRC] = position;
196 ofs += (size_t)position;
197 xt_idxlist_delete(sends_src[i].list);
198 }
199
200 for (size_t i = 0; i < num_dst_msg; ++i) {
201 int position = 0;
202 xt_idxlist_pack(sends_dst[i].list, send_buffer+ofs,
203 (int)(send_buffer_size-ofs), &position, comm);
204 send_size[sends_dst[i].rank][SEND_SIZE_DST] = position;
205 ofs += (size_t)position;
206 xt_idxlist_delete(sends_dst[i].list);
207 }
208 free(sends_dst);
209 num_msg = (int)(num_src_msg + num_dst_msg);
210 } else {
211 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
212 memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
213 }
214
215 if (dst_idxlist_sorted != dst_idxlist)
216 xt_idxlist_delete(dst_idxlist_sorted);
217 if (src_idxlist_sorted != src_idxlist)
218 xt_idxlist_delete(src_idxlist_sorted);
219
220 return (struct capbi_result){ .stripify = stripify, .num_msg = num_msg };
221}
222
223static void
225 int recv_count, void * recv_buffer, int tag,
226 MPI_Comm comm) {
227
228 // initialize distributed directories
229 int total_recv_size = 0;
230
231 for (int i = 0; i < recv_count; ++i)
232 {
233 MPI_Status status;
234
235 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
236 tag, comm, &status), comm);
237
238 int received_count;
239 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
240
241 int position = 0;
242
243 dist_dir->entries[i].rank = status.MPI_SOURCE;
244 dist_dir->entries[i].list =
245 xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
246
247 total_recv_size += received_count;
248 }
249
250 if (total_recv_size != recv_size)
251 Xt_abort(comm, "ERROR: recv_intersections received wrong number of bytes",
252 filename, __LINE__);
253 dist_dir->num_entries = recv_count;
254}
255
256static void send_intersections(void *send_buffer,
257 const int (*send_size)[SEND_SIZE_ASIZE],
258 MPI_Request *dir_init_send_requests,
259 int tag_offset, MPI_Comm comm, int comm_size) {
260 int src_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_src_send;
261 struct Xt_xmdd_txstat txstat
264 src_tag, comm, comm_size,
265 dir_init_send_requests,
266 send_size);
267 int dst_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_dst_send;
268 xt_xmap_dist_dir_send_intersections((unsigned char *)send_buffer
269 + txstat.bytes,
271 dst_tag, comm, comm_size,
272 dir_init_send_requests + txstat.num_msg,
273 send_size);
274}
275
276static struct dist_dir_pair
278 int tag_offset, MPI_Comm comm) {
279
280 struct dist_dir *src_dist_dir, *dst_dist_dir;
281 src_dist_dir = xmalloc(sizeof (struct dist_dir)
282 + (sizeof (struct Xt_com_list)
283 * (size_t)recv_size[SEND_NUM_SRC]));
284 dst_dist_dir = xmalloc(sizeof (struct dist_dir)
285 + (sizeof (struct Xt_com_list)
286 * (size_t)recv_size[SEND_NUM_DST]));
287
288 void * recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
289 recv_size[SEND_SIZE_DST]));
290
291 recv_and_unpack_intersection(src_dist_dir, recv_size[SEND_SIZE_SRC],
292 recv_size[SEND_NUM_SRC], recv_buffer,
294 comm);
295 recv_and_unpack_intersection(dst_dist_dir, recv_size[SEND_SIZE_DST],
296 recv_size[SEND_NUM_DST], recv_buffer,
298 comm);
299
300 free(recv_buffer);
301 return (struct dist_dir_pair){ .src = src_dist_dir,
302 .dst = dst_dist_dir };
303}
304
305
306static size_t
307buf_size_from_intersections(size_t num_intersections,
308 const struct isect *restrict src_dst_intersections,
309 MPI_Comm comm, int comm_size,
310 int (*restrict send_size)[SEND_SIZE_ASIZE])
311{
312 size_t total_send_size = 0;
313 for (int i = 0; i < comm_size; ++i)
314 (void)(send_size[i][SEND_SIZE_SRC] = 0),
315 (void)(send_size[i][SEND_SIZE_DST] = 0),
316 (void)(send_size[i][SEND_NUM_SRC] = 0),
317 (void)(send_size[i][SEND_NUM_DST] = 0);
318
319 int rank_pack_size;
320 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
321
322 for (size_t i = 0; i < num_intersections; ++i)
323 {
324 int msg_size = rank_pack_size
325 + (int)xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist,
326 comm);
327 size_t src_rank
328 = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_src],
329 dst_rank = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_dst];
330 /* send_size[i][SEND_SIZE_(SRC|DST)] are set when actually
331 * packing, because that provides a potentially tighter bound,
332 * see xt_xmap_dist_dir_pack_intersections */
333 ++(send_size[src_rank][SEND_NUM_SRC]);
334 ++(send_size[dst_rank][SEND_NUM_DST]);
335 total_send_size += 2*(size_t)msg_size;
336 }
337 assert(total_send_size <= INT_MAX);
338 return total_send_size;
339}
340
341
342static int
343pack_src_dst_dist_dirs(size_t num_intersections,
344 struct isect *restrict src_dst_intersections,
345 int (*send_size)[SEND_SIZE_ASIZE],
346 void **send_buffer_,
347 MPI_Comm comm, int comm_size) {
348
349 size_t total_send_size
350 = buf_size_from_intersections(num_intersections,
351 src_dst_intersections,
352 comm, comm_size, send_size);
353
354 unsigned char *send_buffer = (*send_buffer_)
355 = xmalloc((size_t)total_send_size);
356 size_t ofs = 0;
357 if (num_intersections > 1)
358 qsort(src_dst_intersections, num_intersections,
359 sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_src_rank);
360 size_t num_send_indices_requests
362 xt_xmdd_direction_src, num_intersections, src_dst_intersections, false,
363 SEND_SIZE_ASIZE, SEND_SIZE_SRC, send_size,
364 send_buffer, total_send_size, &ofs, comm);
365
366 if (num_intersections > 1)
367 qsort(src_dst_intersections, num_intersections,
368 sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_dst_rank);
369 num_send_indices_requests
371 xt_xmdd_direction_dst, num_intersections, src_dst_intersections, true,
372 SEND_SIZE_ASIZE, SEND_SIZE_DST, send_size,
373 send_buffer, total_send_size, &ofs, comm);
374 assert(num_send_indices_requests <= INT_MAX);
375 return (int)num_send_indices_requests;
376}
377
388static void
390 int recv_size[num_sizes],
391 int (*send_size)[num_sizes],
392 MPI_Comm comm) {
393
394#if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
395 xt_mpi_call(MPI_Reduce_scatter_block((int *)send_size, (int *)recv_size,
396 num_sizes, MPI_INT, MPI_SUM,
397 comm), comm);
398#else
399 int comm_size;
400 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
401
402 int *recv_count = xmalloc((size_t)comm_size * sizeof(*recv_count));
403 for (int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
404
405 xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
406 MPI_SUM, comm), comm);
407
408 free(recv_count);
409#endif
410}
411
416
417static struct dd_result
419 Xt_idxlist dst_idxlist,
420 int tag_offset,
421 MPI_Comm comm, int comm_size,
422 Xt_config config) {
423
424 void *send_buffer = NULL;
425
426 int (*send_size)[SEND_SIZE_ASIZE]
427 = xmalloc((size_t)comm_size * sizeof(*send_size));
428
429 struct capbi_result nms
430 = compute_and_pack_bucket_intersections(src_idxlist, dst_idxlist,
431 send_size, &send_buffer,
432 comm, tag_offset, comm_size,
433 config);
434
435 int recv_size[SEND_SIZE_ASIZE]; // for src and dst
436
437 /* get packed intersection sizes to be sent from other ranks */
438 xt_xmap_dist_dir_reduce_scatter_sizes(SEND_SIZE_ASIZE, recv_size, send_size, comm);
439
440 MPI_Request *dir_init_send_requests
441 = xmalloc((size_t)nms.num_msg * sizeof(*dir_init_send_requests));
442 send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
443 dir_init_send_requests, tag_offset, comm, comm_size);
444
445 free(send_size);
446
448 = recv_and_unpack_intersections(recv_size, tag_offset, comm);
449
450 // wait for the sends to be completed
451 xt_mpi_call(MPI_Waitall(nms.num_msg, dir_init_send_requests,
452 MPI_STATUSES_IGNORE), comm);
453 free(dir_init_send_requests);
454 free(send_buffer);
455 return (struct dd_result){ .dist_dirs.dst = dist_dirs.dst,
456 .dist_dirs.src = dist_dirs.src,
457 .stripify = nms.stripify };
458}
459
460static void
462 void *restrict recv_buffer, int tag,
463 MPI_Comm comm)
464{
465
466 // initiate distributed directories
467 int num_entries = 0;
468 while (recv_size > 0) {
469
470 MPI_Status status;
471
472 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
473 MPI_ANY_SOURCE, tag, comm, &status), comm);
474
475 int received_count;
476 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
477
478 recv_size -= received_count;
479
480 int position = 0;
481
482 while (received_count > position) {
483
484 xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
485 &dist_dir->entries[num_entries].rank,
486 1, MPI_INT, comm), comm);
487
488 dist_dir->entries[num_entries].list =
489 xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
490
491 ++num_entries;
492 }
493 }
494 qsort(dist_dir->entries, (size_t)num_entries, sizeof(*dist_dir->entries),
496
497 if (0 != recv_size)
498 Xt_abort(comm, "ERROR: recv_and_unpack_dist_dir_result"
499 " received wrong number of bytes", filename, __LINE__);
500
501 dist_dir->num_entries = num_entries;
502
503}
504
505
506static struct dist_dir_pair
508 int *num_send_indices_requests,
509 MPI_Request *send_indices_requests,
510 int tag_offset,
511 MPI_Comm comm) {
512
513 struct dist_dir_pair dist_dir_results;
514 dist_dir_results.src
515 = xmalloc(sizeof (struct dist_dir)
516 + (sizeof (struct Xt_com_list)
517 * (size_t)recv_size[SEND_NUM_SRC]));
518 dist_dir_results.dst
519 = xmalloc(sizeof (struct dist_dir)
520 + (sizeof (struct Xt_com_list)
521 * (size_t)recv_size[SEND_NUM_DST]));
522
523 void *recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
524 recv_size[SEND_SIZE_DST]));
525
526 recv_and_unpack_dist_dir_result(dist_dir_results.src,
527 recv_size[SEND_SIZE_SRC],
528 recv_buffer, tag_offset
530 assert(dist_dir_results.src->num_entries == recv_size[SEND_NUM_SRC]);
531
532 enum { ops_completed_auto_size = 16 };
533 int ops_completed_auto[ops_completed_auto_size];
534 int *ops_completed
535 = *num_send_indices_requests > ops_completed_auto_size
536 ? xmalloc((size_t)*num_send_indices_requests * sizeof (*ops_completed))
537 : ops_completed_auto;
538 bool all_sends_done
539 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
540 ops_completed, comm);
541
542 recv_and_unpack_dist_dir_result(dist_dir_results.dst,
543 recv_size[SEND_SIZE_DST],
544 recv_buffer, tag_offset
546 assert(dist_dir_results.dst->num_entries == recv_size[SEND_NUM_DST]);
547
548 if (!all_sends_done)
549 all_sends_done
550 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
551 ops_completed, comm);
552 free(recv_buffer);
553
554 xt_xmap_dist_dir_same_rank_merge(&dist_dir_results.src);
555
556 if (!all_sends_done)
557 all_sends_done
558 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
559 ops_completed, comm);
560
561 xt_xmap_dist_dir_same_rank_merge(&dist_dir_results.dst);
562 if (ops_completed != ops_completed_auto) free(ops_completed);
563 return dist_dir_results;
564}
565
566static struct dd_result
568 Xt_idxlist dst_idxlist,
569 int tag_offset,
570 MPI_Comm comm,
571 Xt_config config)
572{
573
574 int comm_size;
575
576 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
577
578 struct dd_result dds
579 = generate_distributed_directories(src_idxlist, dst_idxlist,
580 tag_offset, comm, comm_size, config);
581
582 void * send_buffer;
583
584 int recv_size[SEND_SIZE_ASIZE], (*send_size)[SEND_SIZE_ASIZE]
585 = xmalloc((size_t)comm_size * sizeof(*send_size));
586
587 /* match the source and destination entries in the local distributed
588 * directories... */
589 struct isect *src_dst_intersections;
590 size_t num_intersections
592 &src_dst_intersections, config);
594 /* ... and pack the results into a sendable format */
595 int num_send_indices_requests
596 = pack_src_dst_dist_dirs(num_intersections, src_dst_intersections,
597 send_size, &send_buffer, comm, comm_size);
598 free(src_dst_intersections);
599
600 // get the data size the local process will receive from other processes
602 send_size, comm);
603
604 MPI_Request *send_indices_requests
605 = xmalloc((size_t)num_send_indices_requests
606 * sizeof(*send_indices_requests));
607
608 send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
609 send_indices_requests, tag_offset, comm, comm_size);
610
611 struct dist_dir_pair ddp
613 &num_send_indices_requests,
614 send_indices_requests,
615 tag_offset, comm);
616
617 xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
618 MPI_STATUSES_IGNORE), comm);
619
620 free(send_buffer);
621 free(send_size);
622 free(send_indices_requests);
623 return (struct dd_result){ .dist_dirs.dst = ddp.dst,
624 .dist_dirs.src = ddp.src,
625 .stripify = dds.stripify };
626}
627
630 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
631 MPI_Comm comm, Xt_config config)
632{
633 INSTR_DEF(this_instr,"xt_xmap_all2all_new")
634 INSTR_START(this_instr);
635
636 // ensure that yaxt is initialized
637 assert(xt_initialized());
638
639 int tag_offset;
640 MPI_Comm newcomm = xt_mpi_comm_smart_dup(comm, &tag_offset);
641
642 struct dd_result ddr
643 = exchange_idxlists(src_idxlist, dst_idxlist, tag_offset, newcomm, config);
644
646 if (stripify == 2)
647 stripify = ddr.stripify;
648 Xt_xmap (*xmap_new)(int num_src_intersections,
649 const struct Xt_com_list *src_com,
650 int num_dst_intersections,
651 const struct Xt_com_list *dst_com,
652 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
653 MPI_Comm comm, Xt_config config)
654 = stripify
657
658
659 Xt_xmap xmap
660 = xmap_new(ddr.dist_dirs.src->num_entries, ddr.dist_dirs.src->entries,
662 src_idxlist, dst_idxlist, newcomm, config);
663
664 xt_mpi_comm_smart_dedup(&newcomm, tag_offset);
665
667 INSTR_STOP(this_instr);
668 return xmap;
669}
670
673 MPI_Comm comm)
674{
675 return xt_xmap_dist_dir_intracomm_custom_new(src_idxlist, dst_idxlist, comm,
677}
678
679/*
680 * Local Variables:
681 * c-basic-offset: 2
682 * coding: utf-8
683 * indent-tabs-mode: nil
684 * show-trailing-whitespace: t
685 * require-trailing-newline: t
686 * End:
687 */
@ MPI_COMM_NULL
Definition core.h:74
int MPI_Comm
Definition core.h:64
#define INSTR_STOP(T)
Definition instr.h:69
#define INSTR_DEF(T, S)
Definition instr.h:66
#define INSTR_START(T)
Definition instr.h:68
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
Xt_idxlist list
Definition xt_core.h:154
const struct Xt_xmdd_bucket_gen_ * xmdd_bucket_gen
Xt_xmdd_bucket_gen_init_state_internal init
Xt_xmdd_bucket_gen_get_intersect_max_num get_intersect_max_num
struct dist_dir_pair dist_dirs
struct dist_dir * dst
struct dist_dir * src
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
Definition xt_config.c:203
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_DIST_DIR_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
int xt_initialized(void)
struct Xt_xmap_ * Xt_xmap
Definition xt_core.h:85
#define MAX(a, b)
Definition xt_cuda.c:68
index list declaration
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
Definition xt_idxlist.c:104
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition xt_idxlist.c:86
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition xt_idxlist.c:80
Xt_idxlist xt_idxlist_get_intersection_custom(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst, Xt_config config)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
Definition xt_idxlist.c:359
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition xt_idxlist.c:75
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition xt_mpi.c:333
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition xt_mpi.c:386
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
Definition xt_mpi.c:415
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static const char filename[]
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct dist_dir_pair recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], int tag_offset, MPI_Comm comm)
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_intracomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size, Xt_config config)
@ SEND_SIZE_SRC
@ SEND_NUM_DST
@ SEND_NUM_SRC
@ SEND_SIZE_ASIZE
@ SEND_SIZE_DST
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct capbi_result compute_and_pack_bucket_intersections(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int tag_offset, int comm_size, Xt_config config)
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static struct dist_dir_pair recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
@ Xt_dist_dir_bucket_gen_type_sendrecv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int send_size[rank_lim][send_size_asize])
Utility functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
#define MAX(a, b)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, Xt_config config)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_intersection_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)