photon  1.1
photon_pwc_coll.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/queue.h>
18 
19 #include "photon_backend.h"
20 #include "photon_buffer.h"
21 #include "photon_exchange.h"
22 #include "photon_pwc.h"
23 #include "photon_pwc_coll.h"
24 
25 #ifdef HAVE_MPI
26 #include <mpi.h>
27 #endif
28 
29 static int pwc_coll_init(photonConfig cfg);
30 static int pwc_comm_create(photonComm c, void *active, int num_active, int total);
31 static photonComm pwc_comm_get();
32 static int pwc_coll_probe(int proc, int *flag, photon_cid *c);
33 static int pwc_cid_handler(int proc, pwc_command cmd, photon_cid cid,
34  void *data, int size);
35 static int pwc_barrier(photonComm comm);
36 static int pwc_ibarrier(photonComm comm, photonRequest req);
37 static int pwc_reduce(const void *in, void *out, int count, photonDatatype dt, void *op,
38  int root, photonComm comm);
39 static int pwc_ireduce(const void *in, void *out, int count, photonDatatype dt, void *op,
40  int root, photonComm comm, photonRequest req);
41 static int pwc_gather(const void *in, void *out, int scount, int rcount, photonDatatype sdt,
42  photonDatatype rdt, int root, photonComm comm);
43 static int pwc_igather(const void *in, void *out, int scount, int rcount, photonDatatype sdt,
44  photonDatatype rdt, int root, photonComm comm, photonRequest req);
45 static int pwc_allreduce(const void *in, void *out, int count, photonDatatype dt, void *op,
46  photonComm comm);
47 static int pwc_iallreduce(const void *in, void *out, int count, photonDatatype dt, void *op,
48  photonComm comm, photonRequest req);
49 
50 
52  .init = pwc_coll_init,
53  .comm_create = pwc_comm_create,
54  .comm_get = pwc_comm_get,
55  .probe = pwc_coll_probe,
56  .cid_handler = pwc_cid_handler,
57  .barrier = pwc_barrier,
58  .ibarrier = pwc_ibarrier,
59  .reduce = pwc_reduce,
60  .ireduce = pwc_ireduce,
61  .gather = pwc_gather,
62  .igather = pwc_igather,
63  .allreduce = pwc_allreduce,
64  .iallreduce = pwc_iallreduce,
65  .scan = NULL,
66  .iscan = NULL
67 };
68 
69 static uint8_t *barrier_ary;
70 static uint32_t *ibarrier_ary;
71 static BIT_ARRAY *ibarrier_ba;
72 static two_lock_queue_t *ibarrier_q;
73 
74 static tatas_lock_t barr_lock;
75 static struct photon_buffer_t lbuf;
76 static photonBuffer bufs;
77 
78 static int pwc_coll_init(photonConfig cfg) {
79  int rc;
80  photonBufferHandle local;
81 
82  // our barrier array has one extra element used as a "send" byte
83  barrier_ary = (uint8_t *)calloc(_photon_nproc + 1, sizeof(uint8_t));
84  if (!barrier_ary) {
85  log_err("Could not allocate barrier space");
86  goto error_exit;
87  }
88 
89  // register our local barrier buffer
90  local = photon_buffer_create(barrier_ary, _photon_nproc + 1, BUFFER_FLAG_NIL);
92  memcpy(&lbuf, &local->bint.buf, sizeof(struct photon_buffer_t));
93 
94  bufs = (photonBuffer)calloc(_photon_nproc, sizeof(struct photon_buffer_t));
95  if (!bufs) {
96  goto error_exit_bar;
97  }
98 
99  rc = photon_exchange_allgather(&lbuf, (void*)bufs, sizeof(lbuf));
100  if (rc != PHOTON_OK) {
101  log_err("Could not gather remote buffer info");
102  goto error_exit;
103  }
104 
105  // set the last element to 1 and adjust the send address
106  barrier_ary[_photon_nproc] = 1;
107  lbuf.addr += _photon_nproc;
108 
109  // setup ibarrier accounting
110  ibarrier_q = sync_two_lock_queue_new();
111  sync_tatas_init(&barr_lock);
112  if (_photon_myrank == 0) {
113  ibarrier_ary = calloc(_photon_nproc, sizeof(uint32_t));
114  ibarrier_ba = bit_array_create(_photon_nproc);
115  bit_array_clear_all(ibarrier_ba);
116  }
117 
118  photon_buffer_free(local);
119 
120  return PHOTON_OK;
121 
122  error_exit_bar:
123  free(barrier_ary);
124  error_exit:
125  return PHOTON_ERROR;
126 }
127 
128 static int pwc_coll_probe(int proc, int *flag, photon_cid *c) {
129  return PHOTON_OK;
130 }
131 
132 static int pwc_cid_handler(int proc, pwc_command cmd, photon_cid cid,
133  void *data, int size) {
134  switch(cmd) {
135  case PWC_COMMAND_BARRIER:
136  {
137  // don't do anything for local completions
138  if (proc == _photon_myrank)
139  return PHOTON_OK;
140 
141  int flag = 0;
142  photonRequest req;
143 
144  if (_photon_myrank == 0) {
146  }
147 
148  if (flag || _photon_myrank) {
150  assert(req);
151  photon_pwc_add_req(req);
152  }
153  }
154  break;
155  default:
156  break;
157  }
158 
159  return PHOTON_OK;
160 }
161 
162 
163 static int pwc_comm_create(void *c, void *active, int num_active, int total) {
164  return PHOTON_OK;
165 }
166 
167 static photonComm pwc_comm_get() {
168  return NULL;
169 }
170 
171 // native pwc barrier
172 static int pwc_barrier(void *comm) {
173  int i, rc;
174  uint8_t val;
175  struct photon_buffer_t rbuf;
176  photon_cid cid = {.u64=0};
177 
178  if (_photon_myrank == 0) {
179  for (i=1; i<_photon_nproc; i++) {
180  do {
181  val = sync_load(&barrier_ary[i], SYNC_RELAXED);
182  } while (!val);
183  sync_store(&barrier_ary[i], 0, SYNC_RELAXED);
184  }
185 
186  // reply to each rank (position 0 at each rank)
187  for (i=1; i<_photon_nproc; i++) {
188  rbuf.addr = bufs[i].addr;
189  rbuf.size = sizeof(val);
190  rbuf.priv = bufs[i].priv;
191  rc = _photon_put_with_completion(i, sizeof(val), &lbuf, &rbuf,
192  cid, cid,
195  if (rc != PHOTON_OK) {
196  log_err("Could not send barrier message to rank %d", i);
197  goto error_exit;
198  }
199  }
200  }
201  else {
202  // send to rank 0 at my barrier buffer entry
203  // (base address plus rank offset)
204  rbuf.addr = bufs[0].addr + _photon_myrank * sizeof(val);
205  rbuf.size = sizeof(val);
206  rbuf.priv = bufs[0].priv;
207  rc = _photon_put_with_completion(0, sizeof(val), &lbuf, &rbuf,
208  cid, cid,
211  if (rc != PHOTON_OK) {
212  log_err("Could not send barrier message to rank 0");
213  goto error_exit;
214  }
215 
216  do {
217  val = sync_load(&barrier_ary[0], SYNC_RELAXED);
218  } while (!val);
219  sync_store(&barrier_ary[0], 0, SYNC_RELAXED);
220  }
221  return PHOTON_OK;
222 
223  error_exit:
224  return PHOTON_ERROR;
225 }
226 
227 // native pwc ibarrier
228 static int pwc_ibarrier(void *comm, photonRequest req) {
229  int rc;
230  photon_cid cid = {.u64=0};
231 
232  // ibarriers are enqueued in the order in which they are called
233  sync_two_lock_queue_enqueue(ibarrier_q, req);
234 
235  if (_photon_myrank) {
236  rc = _photon_put_with_completion(0, 0, NULL, NULL,
237  cid, cid,
240  if (rc != PHOTON_OK) {
241  log_err("Could not send barrier message to rank 0");
242  goto error_exit;
243  }
244  }
245  else {
246  int flag = 0;
248  if (flag) {
249  // rank 0 might be the last to join
250  photonRequest req = sync_two_lock_queue_dequeue(ibarrier_q);
251  assert(req);
252  photon_pwc_add_req(req);
253  }
254  }
255 
256  return PHOTON_OK;
257 
258  error_exit:
259  sync_two_lock_queue_dequeue(ibarrier_q);
260  return PHOTON_ERROR;
261 }
262 
263 int photon_pwc_barrier_set_and_check(int proc, int *flag) {
264  int i, rc;
265  bit_index_t tst;
266  photon_cid cid = {.u64=0};
267 
268  *flag = 0;
269 
270  sync_tatas_acquire(&barr_lock);
271  {
272  bit_array_set(ibarrier_ba, proc);
273  ibarrier_ary[proc]++;
274 
275  tst = bit_array_num_bits_set(ibarrier_ba);
276  if (tst == _photon_nproc) {
277  for (i=0; i<_photon_nproc; i++) {
278  ibarrier_ary[i]--;
279  if (ibarrier_ary[i] == 0) {
280  bit_array_clear(ibarrier_ba, i);
281  }
282  }
283  *flag = 1;
284  }
285  }
286  sync_tatas_release(&barr_lock);
287 
288  if (*flag) {
289  for (i=1; i<_photon_nproc; i++) {
290  rc = _photon_put_with_completion(i, 0, NULL, NULL,
291  cid, cid,
294  if (rc != PHOTON_OK) {
295  log_err("Could not send barrier reply to rank %d", i);
296  goto error_exit;
297  }
298  }
299  }
300 
301  return PHOTON_OK;
302 
303  error_exit:
304  return PHOTON_ERROR;
305 }
306 
308  return sync_two_lock_queue_dequeue(ibarrier_q);
309 }
310 
311 static int pwc_reduce(const void *in, void *out, int count, photonDatatype dt, void *op,
312  int root, photonComm comm) {
313 #ifdef HAVE_MPI
314  int rc = MPI_Reduce(in, out, count, dt, op, root, (MPI_Comm)comm);
315  if (rc == MPI_SUCCESS) {
316  return PHOTON_OK;
317  }
318  else return PHOTON_ERROR;
319 #endif
320 
321  log_warn("REDUCE not yet implemented");
322  return PHOTON_ERROR_RESOURCE;
323 }
324 
325 static int pwc_ireduce(const void *in, void *out, int count, photonDatatype dt, void *op,
326  int root, photonComm comm, photonRequest req) {
327  log_warn("IREDUCE not yet implemented");
328  return PHOTON_ERROR_RESOURCE;
329 }
330 
331 static int pwc_gather(const void *in, void *out, int scount, int rcount, photonDatatype sdt,
332  photonDatatype rdt, int root, photonComm comm) {
333 #ifdef HAVE_MPI
334  int rc = MPI_Gather(in, scount, sdt, out, rcount, rdt, root, (MPI_Comm)comm);
335  if (rc == MPI_SUCCESS) {
336  return PHOTON_OK;
337  }
338  else return PHOTON_ERROR;
339 #endif
340 
341  log_warn("GATHER not yet implemented");
342  return PHOTON_ERROR_RESOURCE;
343 }
344 
345 static int pwc_igather(const void *in, void *out, int scount, int rcount, photonDatatype sdt,
346  photonDatatype rdt, int root, photonComm comm, photonRequest req) {
347  log_warn("IGATHER not yet implemented");
348  return PHOTON_ERROR_RESOURCE;
349 }
350 
351 static int pwc_allreduce(const void *in, void *out, int count, photonDatatype dt, void *op,
352  photonComm comm) {
353  #ifdef HAVE_MPI
354  int rc = MPI_Allreduce(in, out, count, dt, op, (MPI_Comm)comm);
355  if (rc == MPI_SUCCESS) {
356  return PHOTON_OK;
357  }
358  else return PHOTON_ERROR;
359 #endif
360 
361  log_warn("ALLREDUCE not yet implemented");
362  return PHOTON_ERROR_RESOURCE;
363 }
364 
365 static int pwc_iallreduce(const void *in, void *out, int count, photonDatatype dt, void *op,
366  photonComm comm, photonRequest req) {
367  log_warn("IALLREDUCE not yet implemented");
368  return PHOTON_ERROR_RESOURCE;
369 }
Convenience pointer type for the buffer structure.
Definition: photon.h:105
photon_datatype_t photonDatatype
photon_coll_interface pwc_collectives
photonBufferHandle photon_buffer_create(void *addr, uint64_t size, int flags)
The Photon completion ID used by the PWC API.
Definition: photon.h:78
struct photon_buffer_handle_t * photonBufferHandle
int _photon_nproc
Definition: libphoton.c:55
photonRequest photon_pwc_barrier_dequeue()
pwc_command
Definition: photon_pwc.h:37
struct photon_buffer_priv_t priv
THe associated private buffer information.
Definition: photon.h:108
int _photon_myrank
Definition: libphoton.c:54
#define PHOTON_REQ_PWC_NO_RCE
PWC flag: do not send a remote completion ID.
Definition: photon.h:44
void photon_buffer_free(photonBufferHandle buf)
struct photon_req_t * photonRequest
int photon_buffer_register(photonBufferHandle buf, int flags)
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
uint64_t size
The size of the buffer in bytes.
Definition: photon.h:107
#define PHOTON_REQ_PWC_NO_LCE
PWC flag: probe will not return a local completion ID.
Definition: photon.h:43
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
int _photon_put_with_completion(int proc, uint64_t size, photonBuffer lbuf, photonBuffer rbuf, photon_cid local, photon_cid remote, int flags, pwc_cid_type type, pwc_command cmd)
Definition: photon_pwc.c:759
uint64_t u64
Unsigned 64b representation of the ID.
Definition: photon.h:80
#define PHOTON_ERROR_RESOURCE
Error code, resource not available.
Definition: photon.h:33
#define BUFFER_FLAG_NIL
int(* init)(photonConfig cfg)
Definition: photon_coll.h:24
int photon_exchange_allgather(void *ptr, void *ivec_ptr, int n)
int photon_pwc_add_req(photonRequest req)
Definition: photon_pwc.c:749
void * photonComm
int photon_pwc_barrier_set_and_check(int proc, int *flag)
uintptr_t addr
The base address of the buffer.
Definition: photon.h:106