Yet Another eXchange Tool 0.11.1
Loading...
Searching...
No Matches
xt_xmap_dist_dir_common.c
Go to the documentation of this file.
1
15/*
16 * Keywords:
17 * Maintainer: Jörg Behrens <behrens@dkrz.de>
18 * Moritz Hanke <hanke@dkrz.de>
19 * Thomas Jahns <jahns@dkrz.de>
20 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
21 *
22 * Redistribution and use in source and binary forms, with or without
23 * modification, are permitted provided that the following conditions are
24 * met:
25 *
26 * Redistributions of source code must retain the above copyright notice,
27 * this list of conditions and the following disclaimer.
28 *
29 * Redistributions in binary form must reproduce the above copyright
30 * notice, this list of conditions and the following disclaimer in the
31 * documentation and/or other materials provided with the distribution.
32 *
33 * Neither the name of the DKRZ GmbH nor the names of its contributors
34 * may be used to endorse or promote products derived from this software
35 * without specific prior written permission.
36 *
37 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
38 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
39 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
40 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
41 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
45 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
46 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
47 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48 */
49#ifdef HAVE_CONFIG_H
50#include <config.h>
51#endif
52
53#include <string.h>
54#include <assert.h>
55
56#include "core/ppm_xfuncs.h"
57#include "xt/xt_idxstripes.h"
58#include "xt/xt_mpi.h"
59#include "xt/xt_xmap_dist_dir.h"
61#include "xt_arithmetic_util.h"
63#include "ensure_array_size.h"
64#include "xt_idxlist_internal.h"
66#include "xt_config_internal.h"
67
68static void
70 size_t num_entries
72 ? (size_t)dist_dir->num_entries : (size_t)0;
73 struct Xt_com_list *entries = dist_dir->entries;
74 for (size_t i = 0; i < num_entries; ++i)
75 xt_idxlist_delete(entries[i].list);
76 free(dist_dir);
77}
78
79void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs) {
80 xt_xmdd_free_dist_dir(dist_dirs.src);
81 xt_xmdd_free_dist_dir(dist_dirs.dst);
82}
83
84
85struct Xt_xmdd_txstat
87 void *restrict send_buffer,
88 size_t send_size_asize, size_t send_size_entry,
89 int tag, MPI_Comm comm, int rank_lim,
90 MPI_Request *restrict requests,
91 const int send_size[rank_lim][send_size_asize])
92{
93 size_t offset = 0;
94 size_t reqOfs = 0;
95
96 // pack the intersections into the send buffer
97 for (size_t rank = 0; rank < (size_t)rank_lim; ++rank)
98 if (send_size[rank][send_size_entry] > 0) {
99 xt_mpi_call(MPI_Isend((char *)send_buffer + offset,
100 send_size[rank][send_size_entry],
101 MPI_PACKED, (int)rank, tag,
102 comm, requests + reqOfs),
103 comm);
104 ++reqOfs;
105 offset += (size_t)send_size[rank][send_size_entry];
106 }
107 return (struct Xt_xmdd_txstat){ .bytes = offset, .num_msg = reqOfs };
108}
109
110size_t
111xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir,
112 const struct dist_dir *dst_dist_dir,
113 struct isect **src_dst_intersections,
114 Xt_config config)
115{
116 struct isect *src_dst_intersections_ = *src_dst_intersections
117 = xmalloc((size_t)src_dist_dir->num_entries
118 * (size_t)dst_dist_dir->num_entries
119 * sizeof(**src_dst_intersections));
120 size_t isect_fill = 0;
121 const struct Xt_com_list *restrict entries_src = src_dist_dir->entries,
122 *restrict entries_dst = dst_dist_dir->entries;
123 size_t num_entries_src = (size_t)src_dist_dir->num_entries,
124 num_entries_dst = (size_t)dst_dist_dir->num_entries;
125 for (size_t i = 0; i < num_entries_src; ++i)
126 for (size_t j = 0; j < num_entries_dst; ++j)
127 {
128 Xt_idxlist intersection
130 entries_src[i].list, entries_dst[j].list, config);
131 if (xt_idxlist_get_num_indices(intersection) > 0) {
132 src_dst_intersections_[isect_fill]
133 = (struct isect){
134 .rank = { [xt_xmdd_direction_src]=entries_src[i].rank,
135 [xt_xmdd_direction_dst]=entries_dst[j].rank},
136 .idxlist = intersection };
137 ++isect_fill;
138 } else
139 xt_idxlist_delete(intersection);
140 }
141 *src_dst_intersections
142 = xrealloc(src_dst_intersections_,
143 isect_fill * sizeof (*src_dst_intersections_));
144 return isect_fill;
145}
146
147#if __GNUC__ == 11 && __GNUC_MINOR__ <= 2
148/* gcc 11.1 has a bug in the -fsanitize=undefined functionality
149 * which creates a bogus warning without the below suppression, bug
150 * report at https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101585 */
151#pragma GCC diagnostic push
152#pragma GCC diagnostic ignored "-Wvla-parameter"
153#endif
154
155size_t
157 enum xt_xmdd_direction target,
158 size_t num_intersections,
159 const struct isect *restrict src_dst_intersections,
160 bool isect_idxlist_delete,
161 size_t send_size_asize, size_t send_size_idx,
162 int (*send_size)[send_size_asize],
163 unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
164{
165 int prev_send_rank = -1;
166 size_t num_send_indices_requests = 0;
167 size_t origin = 1 ^ target, ofs_ = *ofs;
168 int position = 0;
169 for (size_t i = 0; i < num_intersections; ++i)
170 {
171 /* see if this generates a new request? */
172 int send_rank = src_dst_intersections[i].rank[target];
173 num_send_indices_requests += send_rank != prev_send_rank;
174
175 // pack rank
176 XT_MPI_SEND_BUF_CONST int *prank
177 = CAST_MPI_SEND_BUF(src_dst_intersections[i].rank + origin);
178 if (send_rank != prev_send_rank && prev_send_rank != -1) {
179 send_size[prev_send_rank][send_size_idx] = position;
180 ofs_ += (size_t)position;
181 position = 0;
182 }
183 prev_send_rank = send_rank;
184 xt_mpi_call(MPI_Pack(prank, 1, MPI_INT, buffer+ofs_, (int)(buf_size-ofs_),
185 &position, comm), comm);
186 // pack intersection
187 xt_idxlist_pack(src_dst_intersections[i].idxlist, buffer+ofs_,
188 (int)(buf_size-ofs_), &position, comm);
189
190 if (isect_idxlist_delete)
191 xt_idxlist_delete(src_dst_intersections[i].idxlist);
192 }
193 if (prev_send_rank != -1)
194 send_size[prev_send_rank][send_size_idx] = position;
195
196 *ofs = ofs_ + (size_t)position;
197 return num_send_indices_requests;
198}
199
200#if __GNUC__ == 11 && __GNUC_MINOR__ <= 2
201#pragma GCC diagnostic pop
202#endif
203
204
205static int
206stripe_cmp(const void *a, const void *b)
207{
208 typedef const struct Xt_stripe *csx;
209 return (((csx)a)->start > ((csx)b)->start)
210 - (((csx)b)->start > ((csx)a)->start);
211}
212
213/*
214 * @param dist_dir_results contains the intersections of this ranks
215 * dst or src idxlist with other ranks in bucket-sized chunks, the
216 * chunks belonging to the same communication partner are merged
217 * in-place here, i.e. on return (*dist_dir_results)->num_entries is
218 * less than or equal to the previous count and *dist_dir_results
219 * might point somewhere else
220 */
221void
222xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results) {
223
224 struct Xt_com_list *restrict entries = (*dist_dir_results)->entries;
225 size_t num_isect_agg = 0;
226
227 size_t i = 0, num_shards = (size_t)(*dist_dir_results)->num_entries;
228 while (i < num_shards) {
229 int rank = entries[i].rank;
230 size_t j = i;
231 size_t num_stripes = 0;
232 /* find all entries matching the currently considered rank and
233 * count their stripes */
234 do {
235 num_stripes
236 += (size_t)(xt_idxlist_get_num_index_stripes(entries[j].list));
237 ++j;
238 } while (j < num_shards && entries[j].rank == rank);
239 struct Xt_stripes_alloc stripes_alloc = xt_idxstripes_alloc(num_stripes);
240 struct Xt_stripe *restrict stripes = stripes_alloc.stripes;
241 size_t stripe_ofs = 0;
242 for (; i < j; ++i) {
243 size_t num_stripes_of_intersection
244 = (size_t)(xt_idxlist_get_num_index_stripes(entries[i].list));
246 stripes+stripe_ofs,
247 num_stripes-stripe_ofs);
248 xt_idxlist_delete(entries[i].list);
249 stripe_ofs += num_stripes_of_intersection;
250 }
251 qsort(stripes, num_stripes, sizeof (*stripes), stripe_cmp);
252 entries[num_isect_agg].list = xt_idxstripes_congeal(stripes_alloc);
253 entries[num_isect_agg].rank = rank;
254 ++num_isect_agg;
255 }
256 (*dist_dir_results)->num_entries = (int)num_isect_agg;
257 *dist_dir_results = xrealloc(*dist_dir_results, sizeof (struct dist_dir)
258 + (size_t)num_isect_agg
259 * sizeof(struct Xt_com_list));
260}
261
262
263int
264xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
265{
266 const struct isect *a = a_, *b = b_;
267 /* this is safe vs. overflow because ranks are in [0..MAX_INT) */
269}
270
271int
272xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
273{
274 const struct isect *a = a_, *b = b_;
275 /* this is safe vs. overflow because ranks are in [0..MAX_INT) */
277}
278
279int
280xt_com_list_rank_cmp(const void *a_, const void *b_)
281{
282 const struct Xt_com_list *a = a_, *b = b_;
283 /* this is overflow-safe because rank's are non-negative ints */
284 return a->rank - b->rank;
285}
286
289 MPI_Comm comm)
290{
291 return xt_xmap_dist_dir_custom_new(src_idxlist, dst_idxlist, comm,
293}
294
297 MPI_Comm comm, Xt_config config)
298{
299 // ensure that yaxt is initialized
300 assert(xt_initialized());
301
302 int is_inter;
303 xt_mpi_call(MPI_Comm_test_inter(comm, &is_inter), comm);
304 Xt_xmap xmap;
305 if (!is_inter)
306 xmap = xt_xmap_dist_dir_intracomm_custom_new(src_idxlist, dst_idxlist, comm,
307 config);
308 else {
309 MPI_Comm merge_comm, local_intra_comm;
310 MPI_Group local_group;
311 xt_mpi_call(MPI_Comm_group(comm, &local_group), comm);
312 xt_mpi_call(MPI_Intercomm_merge(comm, 0, &merge_comm), comm);
313 xt_mpi_call(MPI_Comm_create(merge_comm, local_group, &local_intra_comm),
314 comm);
315 xt_mpi_call(MPI_Group_free(&local_group), comm);
316 xt_mpi_call(MPI_Comm_free(&merge_comm), comm);
317 xt_mpi_comm_mark_exclusive(local_intra_comm);
318
319 xmap
320 = xt_xmap_dist_dir_intercomm_custom_new(src_idxlist, dst_idxlist,
321 comm, local_intra_comm, config);
322
323 xt_mpi_call(MPI_Comm_free(&local_intra_comm), local_intra_comm);
324 }
325 return xmap;
326}
327
328
329/*
330 * Local Variables:
331 * c-basic-offset: 2
332 * coding: utf-8
333 * indent-tabs-mode: nil
334 * show-trailing-whitespace: t
335 * require-trailing-newline: t
336 * End:
337 */
int MPI_Comm
Definition core.h:64
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Definition ppm_xfuncs.h:71
#define xmalloc(size)
Definition ppm_xfuncs.h:70
Xt_idxlist list
Definition xt_core.h:154
Xt_int start
Definition xt_stripe.h:55
struct Xt_stripe * stripes
struct dist_dir * dst
struct dist_dir * src
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
Definition xt_config.c:203
implementation of configuration object
int xt_initialized(void)
void xt_idxlist_get_index_stripes_keep_buf(Xt_idxlist idxlist, struct Xt_stripe *stripes, size_t num_stripes_alloc)
Definition xt_idxlist.c:147
int xt_idxlist_get_num_index_stripes(Xt_idxlist idxlist)
Definition xt_idxlist.c:129
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition xt_idxlist.c:86
Xt_idxlist xt_idxlist_get_intersection_custom(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst, Xt_config config)
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition xt_idxlist.c:75
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
struct Xt_stripes_alloc xt_idxstripes_alloc(size_t num_stripes)
Xt_idxlist xt_idxstripes_congeal(struct Xt_stripes_alloc stripes_alloc)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
void xt_mpi_comm_mark_exclusive(MPI_Comm comm)
Definition xt_mpi.c:403
Xt_xmap xt_xmap_dist_dir_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_dist_dir_intracomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
static void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
static int stripe_cmp(const void *a, const void *b)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int send_size[rank_lim][send_size_asize])
Utility functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
Xt_xmap xt_xmap_dist_dir_intercomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)