photon  1.1
photon_coll.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 
16 #include "photon_backend.h"
17 #include "photon_pwc.h"
18 #include "photon_coll.h"
19 #include "logging.h"
20 
21 #include "photon_pwc_coll.h"
22 
23 #ifdef ENABLE_NBC
24 #include "photon_pwc_nbc.h"
25 #endif
26 
27 static photon_coll_interface *coll_iface;
28 static void photon_coll_handle_cid(int proc, pwc_command cmd, photon_cid cid,
29  void *data, int size);
30 
31 static void *photon_coll_map[PHOTON_COLL_IFACE_MAX] = {
32  NULL, // no default
33  &pwc_collectives, // pwc interface
34 #ifdef ENABLE_NBC
35  &nbc_collectives // nbc interface
36 #else
37  NULL
38 #endif
39 };
40 
41 int photon_coll_init(photonConfig cfg) {
42  int rc;
43 
44  assert(cfg->coll > PHOTON_COLL_IFACE_DEFAULT &&
45  cfg->coll < PHOTON_COLL_IFACE_MAX);
46 
47  // default to the builtin PWC collectives
48  coll_iface = photon_coll_map[cfg->coll];
49 
50  if (!coll_iface) {
51  log_err("Requested collective interface is not enabled: %s",
52  PHOTON_COLL_IFACE_TO_STRING[cfg->coll]);
53  return PHOTON_ERROR;
54  }
55 
56  one_debug("Initializing the \"%s\" collective interface",
57  PHOTON_COLL_IFACE_TO_STRING[cfg->coll]);
58 
59  // do any initialization needed by given collective implementation
60  if (coll_iface->init) {
61  rc = coll_iface->init(cfg);
62  if (rc != PHOTON_OK) {
63  return PHOTON_ERROR;
64  }
65  }
66 
67  // register our handler to deal with collective completions
68  photon_pwc_register_ctype_handler(PWC_CTYPE_COLL, &photon_coll_handle_cid);
69 
70  return PHOTON_OK;
71 }
72 
73 int photon_coll_probe(int proc, int *flag, photon_cid *comp) {
74 
75  // do any probing needed by the configured collective interface
76  *flag = 0;
77  (*comp).u64 = 0;
78 
79  if (coll_iface->probe) {
80  return coll_iface->probe(proc, flag, comp);
81  }
82 
83  return PHOTON_OK;
84 }
85 
86 int _photon_collective_comm_create(void *active, int num_active, int total,
87  photonComm *c) {
88  int rc;
89  rc = coll_iface->comm_create(c, active, num_active, total);
90  if (rc != PHOTON_OK) {
91  log_err("Could not create communicator");
92  return PHOTON_ERROR;
93  }
94  return PHOTON_OK;
95 }
96 
98  photon_cid local, photon_rid *request, int flags) {
99  photonRequest req;
100  photon_coll_ctx *nctx;
101 
102  assert(ctype && ctype < PHOTON_COLL_MAXVAL);
103 
105  if (!req) {
106  log_err("Could not allocate new request");
107  goto error_exit;
108  }
109 
110  nctx = malloc(sizeof(*nctx));
111  if (!nctx) {
112  log_err("Could not allocate new collective context");
113  goto error_exit_free;
114  }
115 
116  if (c != NULL) {
117  nctx->comm = c;
118  } else {
119  nctx->comm = coll_iface->comm_get();
120  }
121 
122  req->op = REQUEST_OP_COLL;
123  req->type = ctype;
124  req->local_info.id = local;
125  req->remote_info.id.u64 = 0x0;
126  req->ctx = nctx;
127 
128  // control the return of the local id
129  if (flags & PHOTON_REQ_PWC_NO_LCE) {
130  req->flags |= REQUEST_FLAG_NO_LCE;
131  }
132 
133  *request = req->id;
134 
135  return PHOTON_OK;
136 
137 error_exit_free:
138  photon_free_request(req);
139 error_exit:
140  return PHOTON_ERROR;
141 }
142 
143 int _photon_collective_init_new_comm(void *active, int num_active, int total,
144  photon_coll ctype, photon_cid local,
145  photon_rid *request, int flags, photonComm *c) {
146  int rc;
147  void *comm = NULL;
148  photonRequest req;
149  photon_coll_ctx *nctx;
150 
151  assert(ctype && ctype < PHOTON_COLL_MAXVAL);
152 
154  if (!req) {
155  log_err("Could not allocate new request");
156  goto error_exit;
157  }
158 
159  nctx = malloc(sizeof(*nctx));
160  if (!nctx) {
161  log_err("Could not allocate new collective context");
162  goto error_exit_free;
163  }
164 
165  rc = _photon_collective_comm_create(active, num_active, total, &comm);
166  if (rc != PHOTON_OK) {
167  log_err("Could not create communicator");
168  goto error_exit_ctx;
169  }
170 
171  nctx->comm = comm;
172 
173  req->op = REQUEST_OP_COLL;
174  req->type = ctype;
175  req->local_info.id = local;
176  req->remote_info.id.u64 = 0x0;
177  req->ctx = nctx;
178 
179  *request = req->id;
180 
181  return PHOTON_OK;
182 
183 error_exit_ctx:
184  free(nctx);
185 error_exit_free:
186  photon_free_request(req);
187 error_exit:
188  return PHOTON_ERROR;
189 }
190 
191 int _photon_collective_join(photon_rid request, void *in, void *out, int scount, int rcount,
192  photonDatatype stype, photonDatatype rtype, int root, void *op) {
193 
194  photonRequest req;
195  photon_coll_ctx *ctx;
196 
197  req = photon_lookup_request(request);
198  if (!req) {
199  log_err("No request found with rid: 0x%016lx\n", request);
200  goto error_exit;
201  }
202 
203  assert(req->op == REQUEST_OP_COLL);
204 
205  ctx = (photon_coll_ctx *)req->ctx;
206 
207  switch (req->type) {
208  case PHOTON_COLL_BARRIER: {
209  assert(coll_iface->barrier);
210  coll_iface->barrier(ctx->comm);
211  goto block_exit;
212  break;
213  }
214  case PHOTON_COLL_IBARRIER: {
215  // here we return immediately and the request completes later
216  // either via coll_probe above or through native PWC ibarrier
217  assert(coll_iface->ibarrier);
218  return coll_iface->ibarrier(ctx->comm, req);
219  break;
220  }
221  case PHOTON_COLL_REDUCE: {
222  assert(coll_iface->reduce);
223  coll_iface->reduce(in, out, scount, stype, op, root, ctx->comm);
224  goto block_exit;
225  break;
226  }
227  case PHOTON_COLL_IREDUCE: {
228  assert(coll_iface->ireduce);
229  return coll_iface->ireduce(in, out, scount, stype, op, root, ctx->comm, req);
230  break;
231  }
232  case PHOTON_COLL_GATHER: {
233  assert(coll_iface->gather);
234  coll_iface->gather(in, out, scount, rcount, stype, rtype, root, ctx->comm);
235  goto block_exit;
236  break;
237  }
238  case PHOTON_COLL_IGATHER: {
239  assert(coll_iface->igather);
240  return coll_iface->igather(in, out, scount, rcount, stype, rtype, root, ctx->comm, req);
241  break;
242  }
243  case PHOTON_COLL_ALLREDUCE: {
244  assert(coll_iface->allreduce);
245  coll_iface->allreduce(in, out, scount, stype, op, ctx->comm);
246  goto block_exit;
247  break;
248  }
249  case PHOTON_COLL_IALLREDUCE: {
250  // return immediately
251  assert(coll_iface->iallreduce);
252  return coll_iface->iallreduce(in, out, scount, stype, op,
253  ctx->comm, req);
254  break;
255  }
256  case PHOTON_COLL_SCAN:
257  case PHOTON_COLL_ISCAN: break;
258  default:
259  log_err("unsupported collective type: %d", req->type);
260  goto error_exit;
261  break;
262  }
263 
264  // for blocking collectives, we immediately enqueue the request that has been completed
265  block_exit:
266  photon_pwc_add_req(req);
267 
268  return PHOTON_OK;
269 
270 error_exit:
271  return PHOTON_ERROR;
272 }
273 
274 static void photon_coll_handle_cid(int proc, pwc_command cmd, photon_cid cid,
275  void *data, int size) {
276  if (coll_iface->cid_handler) {
277  coll_iface->cid_handler(proc, cmd, cid, data, size);
278  }
279 }
int _photon_collective_join(photon_rid request, void *in, void *out, int scount, int rcount, photonDatatype stype, photonDatatype rtype, int root, void *op)
Definition: photon_coll.c:191
int(* allreduce)(const void *in, void *out, int count, photonDatatype datatype, void *op, photonComm comm)
Definition: photon_coll.h:46
photon_datatype_t photonDatatype
photonRequest photon_lookup_request(photon_rid rid)
photon_coll_interface pwc_collectives
The Photon completion ID used by the PWC API.
Definition: photon.h:78
int photon_pwc_register_ctype_handler(pwc_cid_type type, void *handler)
Definition: photon_pwc.c:743
int photon_coll_probe(int proc, int *flag, photon_cid *comp)
Definition: photon_coll.c:73
int(* probe)(int proc, int *flag, photon_cid *c)
Definition: photon_coll.h:27
uint64_t photon_rid
The Photon request ID.
Definition: photon.h:75
int(* igather)(const void *in, void *out, int scount, int rcount, photonDatatype stype, photonDatatype rtype, int root, photonComm comm, photonRequest req)
Definition: photon_coll.h:42
int(* ireduce)(const void *in, void *out, int count, photonDatatype datatype, void *op, int root, photonComm comm, photonRequest req)
Definition: photon_coll.h:36
pwc_command
Definition: photon_pwc.h:37
int(* iallreduce)(const void *in, void *out, int count, photonDatatype datatype, void *op, photonComm comm, photonRequest req)
Definition: photon_coll.h:48
int _photon_myrank
Definition: libphoton.c:54
photonRequest photon_get_request(int proc)
int _photon_collective_init_new_comm(void *active, int num_active, int total, photon_coll ctype, photon_cid local, photon_rid *request, int flags, photonComm *c)
Definition: photon_coll.c:143
struct photon_req_t * photonRequest
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
int photon_free_request(photonRequest req)
int(* comm_create)(void *c, void *active, int num_active, int total)
Definition: photon_coll.h:25
int(* cid_handler)(int proc, pwc_command cmd, photon_cid cid, void *data, int size)
Definition: photon_coll.h:28
#define PHOTON_REQ_PWC_NO_LCE
PWC flag: probe will not return a local completion ID.
Definition: photon.h:43
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
int(* barrier)(photonComm comm)
Definition: photon_coll.h:31
int(* ibarrier)(photonComm comm, photonRequest req)
Definition: photon_coll.h:32
int _photon_collective_init(photonComm c, photon_coll ctype, photon_cid local, photon_rid *request, int flags)
Definition: photon_coll.c:97
#define REQUEST_FLAG_NO_LCE
#define REQUEST_OP_COLL
uint64_t u64
Unsigned 64b representation of the ID.
Definition: photon.h:80
int photon_coll_init(photonConfig cfg)
Definition: photon_coll.c:41
int(* gather)(const void *in, void *out, int scount, int rcount, photonDatatype stype, photonDatatype rtype, int root, photonComm comm)
Definition: photon_coll.h:39
int(* init)(photonConfig cfg)
Definition: photon_coll.h:24
int(* reduce)(const void *in, void *out, int count, photonDatatype datatype, void *op, int root, photonComm comm)
Definition: photon_coll.h:34
int photon_pwc_add_req(photonRequest req)
Definition: photon_pwc.c:749
void * photonComm
void *(* comm_get)()
Definition: photon_coll.h:26
int _photon_collective_comm_create(void *active, int num_active, int total, photonComm *c)
Definition: photon_coll.c:86