16 #include <sys/types.h> 17 #include <sys/queue.h> 29 static int pwc_coll_init(photonConfig cfg);
30 static int pwc_comm_create(
photonComm c,
void *active,
int num_active,
int total);
32 static int pwc_coll_probe(
int proc,
int *flag,
photon_cid *c);
34 void *data,
int size);
37 static int pwc_reduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
39 static int pwc_ireduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
41 static int pwc_gather(
const void *in,
void *out,
int scount,
int rcount,
photonDatatype sdt,
43 static int pwc_igather(
const void *in,
void *out,
int scount,
int rcount,
photonDatatype sdt,
45 static int pwc_allreduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
47 static int pwc_iallreduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
52 .
init = pwc_coll_init,
53 .comm_create = pwc_comm_create,
54 .comm_get = pwc_comm_get,
55 .probe = pwc_coll_probe,
56 .cid_handler = pwc_cid_handler,
57 .barrier = pwc_barrier,
58 .ibarrier = pwc_ibarrier,
60 .ireduce = pwc_ireduce,
62 .igather = pwc_igather,
63 .allreduce = pwc_allreduce,
64 .iallreduce = pwc_iallreduce,
69 static uint8_t *barrier_ary;
70 static uint32_t *ibarrier_ary;
71 static BIT_ARRAY *ibarrier_ba;
72 static two_lock_queue_t *ibarrier_q;
74 static tatas_lock_t barr_lock;
76 static photonBuffer bufs;
78 static int pwc_coll_init(photonConfig cfg) {
83 barrier_ary = (uint8_t *)calloc(
_photon_nproc + 1,
sizeof(uint8_t));
85 log_err(
"Could not allocate barrier space");
101 log_err(
"Could not gather remote buffer info");
110 ibarrier_q = sync_two_lock_queue_new();
111 sync_tatas_init(&barr_lock);
115 bit_array_clear_all(ibarrier_ba);
128 static int pwc_coll_probe(
int proc,
int *flag,
photon_cid *c) {
133 void *data,
int size) {
163 static int pwc_comm_create(
void *c,
void *active,
int num_active,
int total) {
172 static int pwc_barrier(
void *comm) {
181 val = sync_load(&barrier_ary[i], SYNC_RELAXED);
183 sync_store(&barrier_ary[i], 0, SYNC_RELAXED);
188 rbuf.
addr = bufs[i].addr;
189 rbuf.
size =
sizeof(val);
190 rbuf.
priv = bufs[i].priv;
196 log_err(
"Could not send barrier message to rank %d", i);
205 rbuf.
size =
sizeof(val);
206 rbuf.
priv = bufs[0].priv;
212 log_err(
"Could not send barrier message to rank 0");
217 val = sync_load(&barrier_ary[0], SYNC_RELAXED);
219 sync_store(&barrier_ary[0], 0, SYNC_RELAXED);
233 sync_two_lock_queue_enqueue(ibarrier_q, req);
241 log_err(
"Could not send barrier message to rank 0");
259 sync_two_lock_queue_dequeue(ibarrier_q);
270 sync_tatas_acquire(&barr_lock);
272 bit_array_set(ibarrier_ba, proc);
273 ibarrier_ary[proc]++;
275 tst = bit_array_num_bits_set(ibarrier_ba);
279 if (ibarrier_ary[i] == 0) {
280 bit_array_clear(ibarrier_ba, i);
286 sync_tatas_release(&barr_lock);
295 log_err(
"Could not send barrier reply to rank %d", i);
308 return sync_two_lock_queue_dequeue(ibarrier_q);
311 static int pwc_reduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
314 int rc = MPI_Reduce(in, out, count, dt, op, root, (MPI_Comm)comm);
315 if (rc == MPI_SUCCESS) {
321 log_warn(
"REDUCE not yet implemented");
325 static int pwc_ireduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
327 log_warn(
"IREDUCE not yet implemented");
331 static int pwc_gather(
const void *in,
void *out,
int scount,
int rcount,
photonDatatype sdt,
334 int rc = MPI_Gather(in, scount, sdt, out, rcount, rdt, root, (MPI_Comm)comm);
335 if (rc == MPI_SUCCESS) {
341 log_warn(
"GATHER not yet implemented");
345 static int pwc_igather(
const void *in,
void *out,
int scount,
int rcount,
photonDatatype sdt,
347 log_warn(
"IGATHER not yet implemented");
351 static int pwc_allreduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
354 int rc = MPI_Allreduce(in, out, count, dt, op, (MPI_Comm)comm);
355 if (rc == MPI_SUCCESS) {
361 log_warn(
"ALLREDUCE not yet implemented");
365 static int pwc_iallreduce(
const void *in,
void *out,
int count,
photonDatatype dt,
void *op,
367 log_warn(
"IALLREDUCE not yet implemented");
Convenience pointer type for the buffer structure.
photon_datatype_t photonDatatype
photon_coll_interface pwc_collectives
photonBufferHandle photon_buffer_create(void *addr, uint64_t size, int flags)
The Photon completion ID used by the PWC API.
struct photon_buffer_handle_t * photonBufferHandle
photonRequest photon_pwc_barrier_dequeue()
struct photon_buffer_priv_t priv
THe associated private buffer information.
#define PHOTON_REQ_PWC_NO_RCE
PWC flag: do not send a remote completion ID.
void photon_buffer_free(photonBufferHandle buf)
struct photon_req_t * photonRequest
int photon_buffer_register(photonBufferHandle buf, int flags)
#define PHOTON_ERROR
Error code, general error.
uint64_t size
The size of the buffer in bytes.
#define PHOTON_REQ_PWC_NO_LCE
PWC flag: probe will not return a local completion ID.
#define PHOTON_OK
Photon success code.
int _photon_put_with_completion(int proc, uint64_t size, photonBuffer lbuf, photonBuffer rbuf, photon_cid local, photon_cid remote, int flags, pwc_cid_type type, pwc_command cmd)
uint64_t u64
Unsigned 64b representation of the ID.
#define PHOTON_ERROR_RESOURCE
Error code, resource not available.
int(* init)(photonConfig cfg)
int photon_exchange_allgather(void *ptr, void *ivec_ptr, int n)
int photon_pwc_add_req(photonRequest req)
int photon_pwc_barrier_set_and_check(int proc, int *flag)
uintptr_t addr
The base address of the buffer.