photon  1.1
photon_pwc.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <stdint.h>
17 #include <inttypes.h>
18 #include <assert.h>
19 
20 #include "photon_backend.h"
21 #include "photon_buffertable.h"
22 #include "photon_event.h"
23 #include "photon_pwc.h"
24 #include "photon_coll.h"
25 #include "util.h"
26 
27 static int photon_pwc_test_gwc_pwc(int proc, int *ret_offset);
28 static int photon_pwc_safe_ledger(photonRequest req);
29 static int photon_pwc_test_ledger(int proc, int *ret_offset);
30 static int photon_pwc_try_ledger(photonRequest req, int curr);
31 static int photon_pwc_safe_packed(photonRequest req);
32 static int photon_pwc_test_packed(int proc, uint64_t size, int *ret_offset);
33 static int photon_pwc_try_packed(photonRequest req, int offset);
34 static int photon_pwc_try_gwc(photonRequest req);
35 static int photon_pwc_try_gwc_pwc(photonRequest req, int offset);
36 static int photon_pwc_handle_comp_req(photonRequest req, int *flag,
37  photon_cid *c, void (*cb)(photon_cid));
38 static int photon_pwc_pack_cid(photonRequest req, photonCIDLedgerEntry entry);
39 
40 static two_lock_queue_t *comp_q;
41 static uint16_t **pbufs;
42 static volatile uint32_t qcount = 0;
43 static void *pwc_ctype_handler_table[PWC_CTYPE_MAX];
44 
45 //static inline uint32_t photon_pwc_lcid_size(photonRequest req) {
46 // photon_cid *lid = &req->local_info.id;
47 // return (lid->size) ? lid->size : sizeof(lid->u64);
48 //}
49 
50 static inline uint32_t photon_pwc_rcid_size(photonRequest req) {
51  photon_cid *rid = &req->remote_info.id;
52  return (rid->size) ? rid->size : sizeof(rid->u64);
53 }
54 
55 static inline uint64_t photon_pwc_packed_asize(photonRequest req) {
56  uint32_t rsize = photon_pwc_rcid_size(req);
57  return ALIGN(EB_MSG_SIZE(req->size + rsize), PWC_ALIGN);
58 }
59 
60 static inline int photon_pwc_pack_cid(photonRequest req,
62  photon_cid *rid = &(req->remote_info.id);
63  uint32_t csize = (rid->size) ? rid->size : sizeof(rid->u64);
64 
65  e->hdr.type = req->chdr.type;
66  e->hdr.cmd = req->chdr.cmd;
67  e->hdr.size = csize;
68 
69  if (rid->size) {
70  memcpy(e->data, rid->data, csize);
71  }
72  else {
73  memcpy(e->data, &(rid->u64), csize);
74  }
75  return PHOTON_OK;
76 }
77 
78 static inline int photon_pwc_save_lcid(photonRequest req) {
79  photon_cid *lid = &(req->local_info.id);
80  if (lid->size) {
81  void *p = malloc(lid->size);
82  memcpy(p, lid->data, lid->size);
83  lid->data = p;
84  }
85  return PHOTON_OK;
86 }
87 
88 static inline int photon_pwc_check_gwc_align(int proc, photonBuffer lbuf,
89  photonBuffer rbuf, uint64_t size) {
90  int *align;
91  int asize = 1;
92  int rc;
93  photonBackend be = photon_processes[proc].backend;
94 
95  // default to no alignment constraint
96  align = &asize;
97 
98  rc = be->get_info(NULL, 0, (void**)&align, &asize, PHOTON_GET_ALIGN);
99  if (rc != PHOTON_OK) {
100  dbg_warn("Could not get alignment info from backend");
101  }
102 
103  if (!TEST_ALIGN(lbuf->addr, *align) ||
104  !TEST_ALIGN(rbuf->addr, *align) ||
105  !TEST_ALIGN(size, *align)) {
106  return PHOTON_ERROR;
107  }
108 
109  return PHOTON_OK;
110 }
111 
112 static int photon_pwc_process_command(int proc, photon_cid_recv_hdr *rhdr,
113  photon_cid_hdr *chdr, void *cdata,
114  uintptr_t addr, uint16_t size, void *ptr) {
115 
116  int ctype = chdr->type;
117  int cmd = chdr->cmd;
118 
119  switch (ctype) {
120  case PWC_CTYPE_INTERNAL:
121  {
122  switch (cmd) {
123  case PWC_COMMAND_PWC_REQ:
124  {
125  int rc;
126  photon_cid cid = {
127  .u64 = (uint64_t)addr,
128  .size = 0
129  };
130  struct photon_buffer_t lbuf, rbuf;
131  // make sure we took this branch with a valid payload
132  assert(ptr);
133  // switch the sent lbuf/rbuf
134  memcpy(&rbuf, ptr, sizeof(rbuf));
135  memcpy(&lbuf, ptr+sizeof(rbuf), sizeof(lbuf));
136  rc = _photon_put_with_completion(proc, lbuf.size, &lbuf, &rbuf, cid, cid,
139  if (rc != PHOTON_OK) {
140  log_err("Could not complete PWC_REQ command");
141  goto error_exit;
142  }
143  }
144  break;
145  case PWC_COMMAND_PWC_REP:
146  {
147  // cid encodes the GWC request that initiated the PWC
148  photonRequest req;
149  photon_rid rid = *(photon_rid*)cdata;
150 
151  // copy the payload if reply was packed
152  if (addr && size) {
153  memcpy((void*)addr, ptr, size);
154 
155  // if the message was packed, we're done
156  req = photon_lookup_request(rid);
157  if (!req) {
158  log_err("Could not find request in PWC_REP");
159  goto error_exit;
160  }
161  photon_pwc_add_req(req);
162  }
163  else {
164  // save the rid until we know the 2-put completed
165  sync_store(&(rhdr->req), rid, SYNC_RELAXED);
166  }
167  }
168  break;
169  default:
170  log_err("Unknown INTERNAL command received: 0x%016lx", cmd);
171  break;
172  }
173  }
174  break;
175  default:
176  {
177  // hand off the completion info to a registered handler for the ctype
178  void (*handler)(int, pwc_command, photon_cid, void*, int);
179  handler = pwc_ctype_handler_table[ctype];
180  if (handler) {
181  photon_cid cid = {
182  .data = cdata,
183  .size = chdr->size
184  };
185  handler(proc, cmd, cid, ptr, size);
186  }
187  else {
188  dbg_info("No handler defined for ctype %d", ctype);
189  }
190  }
191  break;
192  }
193 
194  return PHOTON_OK;
195 
196  error_exit:
197  return PHOTON_ERROR;
198 }
199 
200 static int photon_pwc_handle_comp_req(photonRequest req, int *flag, photon_cid *c,
201  void (*cb)(photon_cid)) {
202  int rc;
203 
204  // sometimes the requestor doesn't care about the completion
205  if (! (req->flags & REQUEST_FLAG_NO_LCE)) {
206  *flag = 1;
207  *c = req->local_info.id;
208 
209  // invoke the callback (CID data is freed with request)
210  if (cb) {
211  cb(*c);
212  }
213  }
214 
215  // in addition to the callback, invoke any internal handlers
216  // registered for the PWC ctype used in the request
217  void (*handler)(int, pwc_command, photon_cid, void*, int);
218  handler = pwc_ctype_handler_table[req->chdr.type];
219  if (handler) {
220  handler(_photon_myrank, req->chdr.cmd, req->local_info.id, NULL, 0);
221  }
222  else {
223  dbg_trace("No handler defined for ctype %d", req->chdr.type);
224  }
225 
226  if (req->flags & REQUEST_FLAG_ROP) {
227  // sends a remote completion after the GWC
228  // this GWC request now becomes a PWC
229  // and we reap the put completion internally
230  req->flags |= (REQUEST_FLAG_NO_LCE | REQUEST_FLAG_CMD);
231  req->flags &= ~(REQUEST_FLAG_ROP);
232  req->rattr.cookie = req->id;
233  rc = photon_pwc_safe_ledger(req);
234  if (rc != PHOTON_OK) {
236  rt = photon_processes[req->proc].request_table;
237  sync_two_lock_queue_enqueue(rt->pwc_q, req);
238  sync_fadd(&rt->pcount, 1, SYNC_RELAXED);
239  sync_fadd(&qcount, 1, SYNC_RELAXED);
240  dbg_trace("Enqueing ROP PWC req: 0x%016lx", req->id);
241  }
242  goto no_free;
243  }
244 
245  dbg_trace("Completed and removing PWC/GWC request: 0x%016lx (lid=0x%016lx)",
246  req->id, req->local_info.id);
247  photon_free_request(req);
248 
249  no_free:
250  return PHOTON_OK;
251 }
252 
253 static int photon_pwc_process_queued_gwc(int proc, photonRequestTable rt) {
254  photonRequest req;
255  photonBackend be = photon_processes[proc].backend;
256  uint32_t val;
257  int offset, rc;
258 
259  val = sync_load(&rt->gcount, SYNC_RELAXED);
260  if (!val) {
261  goto pack;
262  }
263 
264  sync_tatas_acquire(&rt->gq_loc);
265  {
266  // try to get a regular queued gwc first
267  val = sync_load(&rt->gcount, SYNC_RELAXED);
268  if (val) {
269  rc = be->tx_size_left(proc);
270  if (rc < 1) {
271  sync_tatas_release(&rt->gq_loc);
272  goto error_resource;
273  }
274  req = sync_two_lock_queue_dequeue(rt->gwc_q);
275  rc = photon_pwc_try_gwc(req);
276  if (rc != PHOTON_OK) {
277  sync_tatas_release(&rt->gq_loc);
278  goto error_exit;
279  }
280  sync_fadd(&rt->gcount, -1, SYNC_RELAXED);
281  sync_fadd(&qcount, -1, SYNC_RELAXED);
282  sync_tatas_release(&rt->gq_loc);
283  goto exit_ok;
284  }
285  }
286  sync_tatas_release(&rt->gq_loc);
287 
288  pack:
289  val = sync_load(&rt->gpcount, SYNC_RELAXED);
290  if (!val) {
291  goto error_exit;
292  }
293 
294  sync_tatas_acquire(&rt->pack_loc);
295  {
296  // try to get a queued gwc-pwc request
297  val = sync_load(&rt->gpcount, SYNC_RELAXED);
298  if (val) {
299  rc = photon_pwc_test_gwc_pwc(proc, &offset);
300  if (rc == PHOTON_ERROR_RESOURCE) {
301  sync_tatas_release(&rt->pack_loc);
302  goto error_resource;
303  }
304  else if (rc == PHOTON_OK) {
305  req = sync_two_lock_queue_dequeue(rt->gwc_pwc_q);
306  rc = photon_pwc_try_gwc_pwc(req, offset);
307  if (rc != PHOTON_OK) {
308  dbg_err("Could not send queued PWC request");
309  sync_tatas_release(&rt->pack_loc);
310  goto error_exit;
311  }
312  sync_fadd(&rt->gpcount, -1, SYNC_RELAXED);
313  sync_fadd(&qcount, -1, SYNC_RELAXED);
314  sync_tatas_release(&rt->pack_loc);
315  goto exit_ok;
316  }
317  }
318  }
319  sync_tatas_release(&rt->pack_loc);
320 
321  error_exit:
322  return PHOTON_ERROR;
323 
324  error_resource:
325  return PHOTON_ERROR_RESOURCE;
326 
327  exit_ok:
328  return PHOTON_OK;
329 }
330 
331 static int photon_pwc_process_queued_pwc(int proc, photonRequestTable rt) {
332  photonRequest req;
333  uint32_t val;
334  int offset, rc;
335 
336  val = sync_load(&rt->pcount, SYNC_RELAXED);
337  if (!val) {
338  return PHOTON_ERROR;
339  }
340 
341  sync_tatas_acquire(&rt->pq_loc);
342  {
343  val = sync_load(&rt->pcount, SYNC_RELAXED);
344  if (!val) {
345  goto error_exit;
346  }
347  // only dequeue a request if there is one and we can send it
348  rc = photon_pwc_test_ledger(proc, &offset);
349  if (rc == PHOTON_OK) {
350  req = sync_two_lock_queue_dequeue(rt->pwc_q);
351  rc = photon_pwc_try_ledger(req, offset);
352  if (rc != PHOTON_OK) {
353  dbg_err("Could not send queued PWC request");
354  goto error_exit;
355  }
356  sync_fadd(&rt->pcount, -1, SYNC_RELAXED);
357  sync_fadd(&qcount, -1, SYNC_RELAXED);
358  }
359  else {
360  goto error_resource;
361  }
362  }
363  sync_tatas_release(&rt->pq_loc);
364 
365  return PHOTON_OK;
366 
367  error_resource:
368  sync_tatas_release(&rt->pq_loc);
369  return PHOTON_ERROR_RESOURCE;
370 
371  error_exit:
372  sync_tatas_release(&rt->pq_loc);
373  return PHOTON_ERROR;
374 }
375 
376 static int photon_pwc_try_gwc_pwc(photonRequest req, int offset) {
377  photonEagerBuf eb;
378  photonBackend be = photon_processes[req->proc].backend;
379  photon_eb_hdr *hdr;
380  uint64_t asize, rsize, imm_data = 0;
381  uintptr_t eager_addr;
382  uint8_t *tail;
383  void *dptr;
384  int rc;
385 
386  dbg_trace("Performing GWC-PUT for req: 0x%016lx", req->id);
387 
388  // make sure the original request size is encoded in the local/remote bufs
389  req->local_info.buf.size = req->size;
390  req->remote_info.buf.size = req->size;
391 
392  req->flags |= REQUEST_FLAG_1PWC;
393  req->rattr.events = 1; // N/A, waits for PWC reply from remote
394  req->rattr.cookie = ( (uint64_t)REQUEST_COOK_GPWC<<32) | req->proc;
395 
396  rsize = 2 * sizeof(struct photon_buffer_t);
397  asize = ALIGN(EB_MSG_SIZE(rsize), PWC_ALIGN);
398  req->psize = asize; // accounting
399 
400  eb = photon_processes[req->proc].remote_pwc_buf;
401  eager_addr = (uintptr_t)eb->remote.addr + offset;
402  hdr = (photon_eb_hdr *)&(eb->data[offset]);
403  hdr->header = UINT8_MAX;
406  hdr->cent.size = 0; // no need to send CID data here
407  hdr->addr = req->id;
408  hdr->length = rsize;
409  hdr->footer = UINT8_MAX;
410 
411  // copy the local buffer
412  dptr = (void*)((uintptr_t)hdr + sizeof(*hdr));
413  memcpy(dptr, (void*)&req->local_info.buf, sizeof(req->local_info.buf));
414 
415  // copy the remote buffer
416  dptr += sizeof(req->local_info.buf);
417  memcpy(dptr, (void*)&req->remote_info.buf, sizeof(req->remote_info.buf));
418 
419  // set a tail flag, the last byte in aligned buffer
420  tail = (uint8_t*)(uintptr_t)((uintptr_t)hdr + asize - 1);
421  *tail = UINT8_MAX;
422 
423  imm_data = ENCODE_RCQ_32(PHOTON_ETYPE_NTFY,
426  offset,
428 
429  rc = be->rdma_put(req->proc, (uintptr_t)hdr, (uintptr_t)eager_addr, asize,
430  &(shared_storage->bint.buf), &eb->remote, req->rattr.cookie,
431  imm_data, RDMA_FLAG_WITH_IMM);
432  if (rc != PHOTON_OK) {
433  dbg_err("RDMA PUT (GWC-PUT) failed for 0x%016lx", req->rattr.cookie);
434  goto error_exit;
435  }
436 
437  dbg_trace("Posted GWC-PUT Request: 0x%016lx", req->id);
438 
439  return PHOTON_OK;
440 
441  error_exit:
442  return PHOTON_ERROR;
443 }
444 
445 static int photon_pwc_try_gwc(photonRequest req) {
446  int rc;
447  photonBackend be = photon_processes[req->proc].backend;
448  rc = be->rdma_get(req->proc, req->local_info.buf.addr,
449  req->remote_info.buf.addr, req->size,
450  &req->local_info.buf,
451  &req->remote_info.buf,
452  req->rattr.cookie, RDMA_FLAG_NIL);
453 
454  if (rc != PHOTON_OK) {
455  dbg_err("RDMA GET (PWC data) failed for 0x%016lx", req->rattr.cookie);
456  goto error_exit;
457  }
458 
459  dbg_trace("Posted GWC Request: %d/0x%016lx/0x%016lx", req->proc,
460  req->local_info.id,
461  req->remote_info.id);
462 
463  return PHOTON_OK;
464 
465  error_exit:
466  return PHOTON_ERROR;
467 }
468 
469 static int photon_pwc_test_gwc_pwc(int proc, int *ret_offset) {
470  int rc, offset;
471  int rsize, asize;
472 
473  // determine if there's space to initiate the GWC-PWC request
474  rsize = 2 * sizeof(struct photon_buffer_t);
475  asize = ALIGN(EB_MSG_SIZE(rsize), PWC_ALIGN);
476 
477  rc = photon_pwc_test_packed(proc, asize, &offset);
478  if (rc == PHOTON_OK) {
479  *ret_offset = offset;
480  }
481 
482  return rc;
483 }
484 
485 static int photon_pwc_safe_packed(photonRequest req) {
487  int rc, offset;
488  uint64_t asize;
489 
490  // only do packed if size makes sense
491  if ((req->size <= 0) ||
492  (req->size > _photon_upsize) ||
493  (req->size > _photon_pbsize)) {
494  return PHOTON_ERROR_RESOURCE;
495  }
496 
497  // keep buffer offsets aligned
498  asize = photon_pwc_packed_asize(req);
499 
500  rt = photon_processes[req->proc].request_table;
501  sync_tatas_acquire(&rt->pack_loc);
502  {
503  rc = photon_pwc_test_packed(req->proc, asize, &offset);
504  if (rc == PHOTON_OK) {
505  req->psize = asize; // accounting
506  rc = photon_pwc_try_packed(req, offset);
507  }
508  }
509  sync_tatas_release(&rt->pack_loc);
510 
511  return rc;
512 }
513 
514 static int photon_pwc_test_packed(int proc, uint64_t size, int *ret_offset) {
515  photonEagerBuf eb;
516  int offset;
517 
518  eb = photon_processes[proc].remote_pwc_buf;
519  offset = photon_rdma_eager_buf_get_offset(proc, eb, size, PWC_MAX_PACKED);
520  if (offset < 0) {
521  return PHOTON_ERROR_RESOURCE;
522  }
523  *ret_offset = offset;
524  return PHOTON_OK;
525 }
526 
527 static int photon_pwc_try_packed(photonRequest req, int offset) {
528  // see if we should pack into an eager buffer and send in one put
529  photonEagerBuf eb;
530  photonBackend be = photon_processes[req->proc].backend;
531  photon_cid *rid;
532  photon_eb_hdr *hdr;
533  uint64_t asize, imm_data = 0;
534  uintptr_t eager_addr;
535  void *dptr;
536  uint8_t *tail;
537  uint32_t rsize;
538  int rc;
539 
540  req->flags |= REQUEST_FLAG_1PWC;
541  req->rattr.events = 1;
542 
543  rid = &(req->remote_info.id);
544  rsize = photon_pwc_rcid_size(req);
545  asize = photon_pwc_packed_asize(req);
546 
547  eb = photon_processes[req->proc].remote_pwc_buf;
548  eager_addr = (uintptr_t)eb->remote.addr + offset;
549  hdr = (photon_eb_hdr *)&(eb->data[offset]);
550  hdr->header = UINT8_MAX;
551  hdr->cent.type = req->chdr.type;
552  hdr->cent.cmd = req->chdr.cmd;
553  hdr->cent.size = rsize;
554  hdr->addr = req->remote_info.buf.addr;
555  hdr->length = (uint16_t)req->size;
556  hdr->footer = UINT8_MAX;
557 
558  // copy in the cid
559  dptr = (void*)((uintptr_t)hdr + sizeof(*hdr));
560  if (rid->size) {
561  memcpy(dptr, rid->data, rsize);
562  }
563  else {
564  memcpy(dptr, &(rid->u64), rsize);
565  }
566 
567  // copy the payload
568  dptr += rsize;
569  memcpy(dptr, (void*)req->local_info.buf.addr, req->size);
570 
571  // set a tail flag, the last byte in aligned buffer
572  tail = (uint8_t*)(uintptr_t)((uintptr_t)hdr + asize - 1);
573  *tail = UINT8_MAX;
574 
575  imm_data = ENCODE_RCQ_32(PHOTON_ETYPE_NTFY,
578  offset,
580 
581  rc = be->rdma_put(req->proc, (uintptr_t)hdr, (uintptr_t)eager_addr, asize,
582  &(shared_storage->bint.buf), &eb->remote, req->rattr.cookie,
583  imm_data, RDMA_FLAG_WITH_IMM);
584  if (rc != PHOTON_OK) {
585  dbg_err("RDMA PUT (PWC EAGER) failed for 0x%016lx", req->rattr.cookie);
586  goto error_exit;
587  }
588 
589  dbg_trace("Posted PWC Request: %d/0x%016lx/0x%016lx", req->proc,
590  req->local_info.id,
591  req->remote_info.id);
592 
593  return PHOTON_OK;
594 
595  error_exit:
596  return PHOTON_ERROR;
597 }
598 
599 static int photon_pwc_safe_ledger(photonRequest req) {
600  int rc, offset;
602  sync_tatas_acquire(&rt->pq_loc);
603  {
604  rc = photon_pwc_test_ledger(req->proc, &offset);
605  if (rc == PHOTON_OK) {
606  rc = photon_pwc_try_ledger(req, offset);
607  }
608  }
609  sync_tatas_release(&rt->pq_loc);
610  return rc;
611 }
612 
613 static int photon_pwc_test_ledger(int proc, int *ret_offset) {
614  photonLedger l;
615  int curr;
617  curr = photon_rdma_ledger_get_next(proc, l);
618  if (curr < 0) {
619  return PHOTON_ERROR_RESOURCE;
620  }
621  *ret_offset = curr;
622  return PHOTON_OK;
623 }
624 
625 static int photon_pwc_try_ledger(photonRequest req, int curr) {
627  photonBackend be = photon_processes[req->proc].backend;
628  uintptr_t rmt_addr;
629  uint64_t imm_data = 0;
630  int rc, iflag, rflags;
631 
632  req->flags |= REQUEST_FLAG_2PWC;
633  req->rattr.events = 1;
634 
635  if ((req->size) > 0 && !(req->flags & REQUEST_FLAG_CMD)) {
636  if (!req->local_info.buf.priv.key0 && !req->local_info.buf.priv.key1) {
637  if (buffertable_find_containing( (void *)req->local_info.buf.addr,
638  req->size, &db) != 0) {
639  log_err("Tried posting from a buffer that's not registered");
640  goto error_exit;
641  }
642  req->local_info.buf.priv = db->bint.buf.priv;
643  }
644 
645  if (! (req->flags & REQUEST_FLAG_NO_RCE)) {
646  imm_data = ENCODE_RCQ_32(PHOTON_ETYPE_DATA,
649  curr,
651  rflags = RDMA_FLAG_WITH_IMM;
652  req->rattr.events = 2;
653  }
654  else {
655  rflags = RDMA_FLAG_NIL;
656  }
657 
658  rc = be->rdma_put(req->proc,
659  req->local_info.buf.addr,
660  req->remote_info.buf.addr,
661  req->size,
662  &req->local_info.buf,
663  &req->remote_info.buf,
664  req->rattr.cookie,
665  imm_data,
666  rflags);
667  if (rc != PHOTON_OK) {
668  dbg_err("RDMA PUT (PWC data) failed for 0x%016lx", req->rattr.cookie);
669  goto error_exit;
670  }
671  iflag = PHOTON_EFLAG_TWO;
672  }
673  else {
674  iflag = PHOTON_EFLAG_ONE;
675  }
676 
677  if (! (req->flags & REQUEST_FLAG_NO_RCE)) {
679  photonCIDLedgerEntry entry = (photonCIDLedgerEntry)((uintptr_t)l->entries +
680  (curr * l->entry_size));
681 
682  photon_pwc_pack_cid(req, entry);
683 
684  rmt_addr = l->remote.addr + (curr * PHOTON_CID_RECV_ENTRY_SIZE) +
685  sizeof(photon_cid_recv_hdr);
686 
687  imm_data = ENCODE_RCQ_32(PHOTON_ETYPE_NTFY,
689  iflag,
690  curr,
692 
693  rc = be->rdma_put(req->proc, (uintptr_t)entry, rmt_addr,
694  sizeof(*entry)+entry->hdr.size,
695  &(shared_storage->bint.buf),
696  &(l->remote), req->rattr.cookie,
697  imm_data, RDMA_FLAG_WITH_IMM);
698 
699  if (rc != PHOTON_OK) {
700  dbg_err("RDMA PUT (PWC comp) failed for 0x%016lx", req->rattr.cookie);
701  goto error_exit;
702  }
703  }
704 
705  dbg_trace("Posted PWC Request: %d/0x%016lx/0x%016lx/0x%016lx", req->proc,
706  req->rattr.cookie,
707  req->local_info.id,
708  req->remote_info.id);
709 
710  return PHOTON_OK;
711 
712  error_exit:
713  return PHOTON_ERROR;
714 }
715 
716 int photon_pwc_init(photonConfig cfg) {
717  comp_q = sync_two_lock_queue_new();
718  if (!comp_q) {
719  log_err("Could not allocate PWC completion queue");
720  goto error_exit;
721  }
722 
723  pbufs = (uint16_t**)malloc(_photon_nproc*sizeof(uint16_t*));
724  if (!pbufs) {
725  log_err("Could not allocate PWC recv acct buffers");
726  goto error_exit;
727  }
728  for (int i = 0; i < _photon_nproc; i++) {
729  pbufs[i] = (uint16_t*)calloc(_photon_pbsize, sizeof(uint16_t));
730  }
731 
732  // initialize ctype handlers to NULL
733  for (int i = 0; i < PWC_CTYPE_MAX; i++) {
734  pwc_ctype_handler_table[i] = NULL;
735  }
736 
737  return PHOTON_OK;
738 
739  error_exit:
740  return PHOTON_ERROR;
741 }
742 
744  pwc_ctype_handler_table[type] = handler;
745  dbg_trace("Registered ctype %d handler %p", type, handler);
746  return PHOTON_OK;
747 }
748 
750  sync_two_lock_queue_enqueue(comp_q, req);
751  dbg_trace("Enqueing completed request: 0x%016lx", req->id);
752  return PHOTON_OK;
753 }
754 
756  return sync_two_lock_queue_dequeue(comp_q);
757 }
758 
759 int _photon_put_with_completion(int proc, uint64_t size,
760  photonBuffer lbuf,
761  photonBuffer rbuf,
762  photon_cid local, photon_cid remote,
763  int flags,
764  pwc_cid_type type, pwc_command cmd) {
765  photonRequest req;
767  int rc;
768 
769  dbg_trace("(%d, size: %lu, lid: 0x%016lx, rid: 0x%016lx, flags: %d)", proc,
770  size, local.u64, remote.u64, flags);
771 
772  if (local.size && local.size > _photon_idsize) {
773  log_err("Local CID size is larger than configured max: %d/%d",
774  local.size, _photon_idsize);
775  goto error_exit;
776  }
777 
778  if (remote.size && remote.size > _photon_idsize) {
779  log_err("Remote CID size is larger than configured max: %d/%d",
780  remote.size, _photon_idsize);
781  goto error_exit;
782  }
783 
784  if (size && !lbuf) {
785  log_err("Trying to put size %lu and NULL lbuf", size);
786  goto error_exit;
787  }
788 
789  if (size && !rbuf) {
790  log_err("Trying to put size %lu and NULL rbuf", size);
791  goto error_exit;
792  }
793 
794  if (!size && (flags & PHOTON_REQ_PWC_NO_RCE)) {
795  dbg_warn("Nothing to send and no remote completion requested!");
796  return PHOTON_OK;
797  }
798 
799  req = photon_setup_request_direct(lbuf, rbuf, size, proc, 0);
800  if (!req) {
801  dbg_err("Could not allocate request");
802  goto error_exit;
803  }
804 
805  req->op = REQUEST_OP_PWC;
806  req->chdr.type = type;
807  req->chdr.cmd = cmd;
808  req->local_info.id = local;
809  req->remote_info.id = remote;
810 
811  photon_pwc_save_lcid(req);
812 
813  // control the return of the local id
814  if (flags & PHOTON_REQ_PWC_NO_LCE) {
815  req->flags |= REQUEST_FLAG_NO_LCE;
816  }
817 
818  // control the return of the remote id
819  if (flags & PHOTON_REQ_PWC_NO_RCE) {
820  req->flags |= REQUEST_FLAG_NO_RCE;
821  return photon_pwc_try_ledger(req, 0);
822  }
823 
824  rt = photon_processes[proc].request_table;
825 
826  // process any queued requests for this peer first
827  rc = photon_pwc_process_queued_pwc(proc, rt);
828  if ((rc == PHOTON_OK) ||
829  (rc == PHOTON_ERROR_RESOURCE)) {
830  goto queue_exit;
831  }
832 
833  // otherwise try to send the current request
834  rc = photon_pwc_safe_packed(req);
835  if (rc == PHOTON_ERROR_RESOURCE) {
836  rc = photon_pwc_safe_ledger(req);
837  }
838 
839  if ((rc == PHOTON_ERROR) || (rc == PHOTON_OK)) {
840  return rc;
841  }
842 
843  queue_exit:
844  sync_two_lock_queue_enqueue(rt->pwc_q, req);
845  sync_fadd(&rt->pcount, 1, SYNC_RELAXED);
846  sync_fadd(&qcount, 1, SYNC_RELAXED);
847  dbg_trace("Enqueued PWC request: 0x%016lx", req->id);
848  return PHOTON_OK;
849 
850  error_exit:
851  return PHOTON_ERROR;
852 }
853 
854 int _photon_get_with_completion(int proc, uint64_t size,
855  photonBuffer lbuf,
856  photonBuffer rbuf,
857  photon_cid local, photon_cid remote,
858  int flags,
859  pwc_cid_type type, pwc_command cmd) {
860  photonRequest req;
862  photonBackend be = photon_processes[proc].backend;
864  struct photon_buffer_t nlbuf;
865  int rc, aok;
866 
867  dbg_trace("(%d, size: %lu, lid: 0x%016lx, rid: 0x%016lx, flags: %d)", proc,
868  size, local.u64, remote.u64, flags);
869 
870  if (local.size && local.size > _photon_idsize) {
871  log_err("Local CID size is larger than configured max: %d/%d",
872  local.size, _photon_idsize);
873  goto error_exit;
874  }
875 
876  if (remote.size && remote.size > _photon_idsize) {
877  log_err("Remote CID size is larger than configured max: %d/%d",
878  remote.size, _photon_idsize);
879  goto error_exit;
880  }
881 
882  if (size && !rbuf) {
883  log_err("Tring to get size %lu and NULL rbuf", size);
884  goto error_exit;
885  }
886 
887  if (!size && (flags & PHOTON_REQ_PWC_NO_RCE)) {
888  dbg_warn("Nothing to get and no remote completion requested!");
889  return PHOTON_OK;
890  }
891 
892  // always make sure local buffer is registered for gets
893  nlbuf.addr = lbuf->addr;
894  nlbuf.size = lbuf->size;
895  nlbuf.priv = lbuf->priv;
896  if (size && !nlbuf.priv.key0 && !nlbuf.priv.key1) {
897  if (buffertable_find_containing( (void *)nlbuf.addr,
898  size, &db) != 0) {
899  log_err("Tried get into a buffer that's not registered");
900  goto error_exit;
901  }
902  nlbuf.priv = db->bint.buf.priv;
903  }
904 
905  req = photon_setup_request_direct(&nlbuf, rbuf, size, proc, 1);
906  if (req == NULL) {
907  dbg_trace("Could not setup direct buffer request");
908  goto error_exit;
909  }
910 
911  req->op = REQUEST_OP_PWC;
912  req->chdr.type = type;
913  req->chdr.cmd = cmd;
914  req->local_info.id = local;
915  req->remote_info.id = remote;
916 
917  photon_pwc_save_lcid(req);
918 
919  // control the return of the local id
920  if (flags & PHOTON_REQ_PWC_NO_LCE) {
921  req->flags |= REQUEST_FLAG_NO_LCE;
922  }
923 
924  // control the sending of the remote id to proc
925  if (! (flags & PHOTON_REQ_PWC_NO_RCE)) {
926  req->flags |= REQUEST_FLAG_ROP;
927  }
928 
929  // we are already done if there's nothing to get
930  // completing the request will send the remote cid
931  if (!req->size) {
932  photon_pwc_add_req(req);
933  return PHOTON_OK;
934  }
935 
936  aok = photon_pwc_check_gwc_align(req->proc,
937  &req->local_info.buf,
938  &req->remote_info.buf,
939  req->size);
940 
941  // process any queued requests for this peer first
942  rt = photon_processes[proc].request_table;
943  rc = photon_pwc_process_queued_gwc(proc, rt);
944  if ((rc == PHOTON_OK) ||
945  (rc == PHOTON_ERROR_RESOURCE)) {
946  if (aok == PHOTON_OK)
947  goto queue_gwc_exit;
948  else
949  goto queue_gwc_pwc_exit;
950  }
951 
952  if (aok == PHOTON_OK) {
953  // do a normal gwc
954  // make sure there are TX resources first
955  rc = be->tx_size_left(proc);
956  if (rc < 1) {
957  goto queue_gwc_exit;
958  }
959  rc = photon_pwc_try_gwc(req);
960  }
961  else {
962  // do a gwc-pwc to handle the unsupported alignment
963  int offset = 0;
964  sync_tatas_acquire(&rt->pack_loc);
965  {
966  rc = photon_pwc_test_gwc_pwc(proc, &offset);
967  if (rc == PHOTON_ERROR_RESOURCE) {
968  sync_tatas_release(&rt->pack_loc);
969  goto queue_gwc_pwc_exit;
970  }
971  rc = photon_pwc_try_gwc_pwc(req, offset);
972  }
973  sync_tatas_release(&rt->pack_loc);
974  }
975 
976  return rc;
977 
978  queue_gwc_exit:
979  sync_two_lock_queue_enqueue(rt->gwc_q, req);
980  sync_fadd(&rt->gcount, 1, SYNC_RELAXED);
981  sync_fadd(&qcount, 1, SYNC_RELAXED);
982  dbg_trace("Enqueued GWC request: 0x%016lx", req->id);
983  return PHOTON_OK;
984 
985  queue_gwc_pwc_exit:
986  sync_two_lock_queue_enqueue(rt->gwc_pwc_q, req);
987  sync_fadd(&rt->gpcount, 1, SYNC_RELAXED);
988  sync_fadd(&qcount, 1, SYNC_RELAXED);
989  dbg_trace("Enqueued GWC-PWC request: 0x%016lx", req->id);
990  return PHOTON_OK;
991 
992  error_exit:
993  return PHOTON_ERROR;
994 }
995 
996 static int photon_pwc_probe_local(int proc, int *flag, photon_cid *c,
997  void (*cb)(photon_cid)) {
998  photonRequest req;
999  photon_rid cookie = NULL_REQUEST;
1000  int rc;
1001 
1002  // handle any pwc requests that were popped in some other path
1003  req = photon_pwc_pop_req(proc);
1004  if (req != NULL) {
1005  photon_pwc_handle_comp_req(req, flag, c, cb);
1006  goto exit;
1007  }
1008 
1009  rc = __photon_get_event(proc, &cookie);
1010  if (rc == PHOTON_EVENT_ERROR) {
1011  dbg_err("Error getting event, rc=%d", rc);
1012  goto error_exit;
1013  }
1014  else if (rc == PHOTON_EVENT_OK) {
1015  // we found an event to process
1016  rc = __photon_handle_cq_event(NULL, cookie, &req);
1017  if (rc == PHOTON_EVENT_ERROR) {
1018  goto error_exit;
1019  }
1020  else if ((rc == PHOTON_EVENT_REQCOMP) && req &&
1021  (req->op == REQUEST_OP_PWC)) {
1022  photon_pwc_handle_comp_req(req, flag, c, cb);
1023  goto exit;
1024  }
1025  else {
1026  dbg_trace("PWC probe handled non-completion event: 0x%016lx", cookie);
1027  }
1028  }
1029 
1030  return PHOTON_EVENT_NONE;
1031 
1032  exit:
1033  return PHOTON_EVENT_REQFOUND;
1034 
1035  error_exit:
1036  return PHOTON_EVENT_ERROR;
1037 }
1038 
1039 static inline int photon_pwc_handle_pooo(photonEagerBuf eb, int proc,
1040  int offset, int size) {
1041  uint64_t left;
1042  uint64_t prog = sync_load(&(eb->prog), SYNC_RELAXED);
1043  int p = prog & (eb->size - 1);
1044  if (p != offset) {
1045  pbufs[proc][offset] = size;
1046  }
1047  else {
1048  int padd = size;
1049  int val = size;
1050  int iter = p;
1051  do {
1052  iter = (iter+val) & (eb->size - 1);
1053  left = eb->size - iter;
1054  if (left < PWC_MAX_PACKED) {
1055  padd += left;
1056  iter = 0;
1057  }
1058  val = pbufs[proc][iter];
1059  if (val) {
1060  sync_store(&pbufs[proc][iter], 0, SYNC_RELAXED);
1061  padd += val;
1062  }
1063  } while (val);
1064  sync_fadd(&eb->prog, padd, SYNC_RELAXED);
1065  }
1066 
1067  return PHOTON_OK;
1068 }
1069 
1070 static inline int photon_pwc_handle_looo(photonLedger l, uint64_t imm,
1071  photon_cid *comp, int do_comp, int *src,
1072  void (*cb)(photon_cid), int *flag) {
1073 
1074  int type = DECODE_RCQ_32_TYPE(imm);
1075  int proc = DECODE_RCQ_32_PROC(imm);
1076  int rflg = DECODE_RCQ_32_FLAG(imm);
1077  int loc = (type==PHOTON_ETYPE_DATA) ? DECODE_RCQ_32_CURR(imm) : *src;
1078 
1080  (photonCIDRecvLedgerEntry)((uintptr_t)l->entries +
1081  (loc * PHOTON_CID_RECV_ENTRY_SIZE));
1082  photon_cid_recv_hdr *rhdr = &le->rhdr;
1083  photon_cid_hdr *hdr = &le->hdr;
1084 
1085  // now deal with the ordering of 2-put
1086  switch (type) {
1087  case PHOTON_ETYPE_DATA:
1088  {
1089  int count = sync_addf(&(rhdr->cnt), 1, SYNC_RELAXED);
1090  if (count == 2) {
1091  *flag = 1;
1092  *src = proc;
1093  comp->size = hdr->size;
1094  comp->data = (void*)le->data;
1095  sync_store(&(rhdr->cnt), 0, SYNC_RELAXED);
1096  goto exit_advance;
1097  }
1098  break;
1099  }
1100  case PHOTON_ETYPE_NTFY:
1101  {
1102  // we're done if this is a single ledger put
1103  if (rflg == PHOTON_EFLAG_ONE) {
1104  *flag = 1;
1105  goto exit_advance;
1106  }
1107 
1108  int count = sync_addf(&(rhdr->cnt), 1, SYNC_RELAXED);
1109  if (count == 2) {
1110  *flag = 1;
1111  sync_store(&(rhdr->cnt), 0, SYNC_RELAXED);
1112  goto exit_advance;
1113  }
1114  }
1115  break;
1116  default:
1117  break;
1118  }
1119 
1120  return PHOTON_EVENT_NONE;
1121 
1122  exit_advance:
1123  if (rhdr->req) {
1124  // pop the internal request we saved earlier
1126  if (!req) {
1127  log_err("Could not lookup saved request ID");
1128  }
1129  else {
1130  photon_pwc_add_req(req);
1131  sync_store(&(rhdr->req), NULL_REQUEST, SYNC_RELAXED);
1132  }
1133  // this also means no CID via the API
1134  *flag = 0;
1135  }
1136 
1137  // set flag only if we have a successful completion
1138  // AND we actually want to return the CID (i.e. it is CTYPE_USER)
1139  *flag = *flag && do_comp;
1140 
1141  // invoke the CID callback if we're returning completion data
1142  if (*flag && cb) {
1143  cb(*comp);
1144  }
1145 
1146  // return 64 bit CIDs by value in out param, indicated by size 0
1147  if (*flag && (comp->size == sizeof(comp->u64))) {
1148  comp->u64 = *(uint64_t*)comp->data;
1149  comp->size = 0;
1150  }
1151 
1152  // progress book keeping
1153  uint64_t prog = sync_load(&(l->prog), SYNC_RELAXED);
1154  int p = prog & (_LEDGER_SIZE - 1);
1155  if (p != loc) {
1156  // mark slot as received
1157  sync_store(&(rhdr->cnt), -1, SYNC_RELAXED);
1158  }
1159  else {
1160  int padd = 1;
1161  int val, iter = p;
1162  do {
1163  // calculate additional progress already completed due to ooo events
1164  iter = (iter+1) & (_LEDGER_SIZE - 1);
1165  le = (photonCIDRecvLedgerEntry)((uintptr_t)l->entries +
1166  (iter * PHOTON_CID_RECV_ENTRY_SIZE));
1167  val = sync_load(&(le->rhdr.cnt), SYNC_RELAXED);
1168  if (val < 0) {
1169  sync_store(&(le->rhdr.cnt), 0, SYNC_RELAXED);
1170  padd++;
1171  }
1172  } while (val < 0);
1173  // advance ledger progress
1174  sync_fadd(&(l->prog), padd, SYNC_RELAXED);
1175  }
1176 
1177  dbg_trace("Popped ledger event with id: 0x%016lx (%lu)",
1178  comp->u64, comp->u64);
1179 
1180  return PHOTON_OK;
1181 }
1182 
1183 static inline int photon_pwc_advance_ledger(photonLedger l, photon_cid *comp, int do_comp,
1184  void (*cb)(photon_cid), int *flag) {
1185  *flag = 1;
1186  // set flag only if we have a successful completion
1187  // AND we actually want to return the CID (i.e. it is CTYPE_USER)
1188  *flag = *flag && do_comp;
1189 
1190  // invoke the CID callback if we're returning completion data
1191  if (*flag && cb) {
1192  cb(*comp);
1193  }
1194 
1195  // return 64 bit CIDs by value in out param, indicated by size 0
1196  if (*flag && (comp->size == sizeof(comp->u64))) {
1197  comp->u64 = *(uint64_t*)comp->data;
1198  comp->size = 0;
1199  }
1200 
1201  // advance ledger progress by one
1202  sync_fadd(&(l->prog), 1, SYNC_RELAXED);
1203 
1204  dbg_trace("Popped ledger event with id: 0x%016lx (%lu)",
1205  comp->u64, comp->u64);
1206 
1207  return PHOTON_OK;
1208 }
1209 
1210 static int photon_pwc_probe_ledger(int proc, int *flag, photon_cid *comp,
1211  int *src, void (*cb)(photon_cid)) {
1212  photonLedger ledger;
1214  photonEagerBuf eb;
1215  photonBackend be = __photon_fabric;
1216  photon_eb_hdr *hdr;
1217  photon_rid cookie = NULL_REQUEST;
1218  uint64_t imm;
1219  int i, rc, start, end, lead, etype, pos = -1;
1220  int scan_packed = 1, scan_ledger = 1;
1221  int rcq = 0;
1222 
1223  if (proc == PHOTON_ANY_SOURCE) {
1224  rc = __photon_get_revent(proc, &cookie, &imm);
1225  if (rc == PHOTON_EVENT_OK) {
1226 
1227  etype = DECODE_RCQ_32_TYPE(imm);
1228  start = DECODE_RCQ_32_PROC(imm);
1229  end = start+1;
1230 
1231  // Check for completion of first part of a 2PWC request
1232  if (etype == PHOTON_ETYPE_DATA) {
1233  ledger = photon_processes[start].local_pwc_ledger;
1234  return photon_pwc_handle_looo(ledger, imm, comp, 1, src, cb, flag);
1235  }
1236 
1237  lead = DECODE_RCQ_32_LEAD(imm);
1238  scan_packed = (lead == PHOTON_EFLAG_PACK) ? 1 : 0;
1239  scan_ledger = (lead == PHOTON_EFLAG_LEDG) ? 1 : 0;
1240 
1241  // some backends have no ordering guarantees and will give us
1242  // the next entry location
1243  if (be->attr->comp_order & PHOTON_ORDER_NONE) {
1244  pos = DECODE_RCQ_32_CURR(imm);
1245  }
1246 
1247  rcq = 1;
1248  }
1249  // If we don't have remote completion support, must scan entire ledger
1250  else if (rc == PHOTON_EVENT_NOTIMPL) {
1251  start = 0;
1252  end = _photon_nproc;
1253  imm = UINT64_MAX;
1254  }
1255  else {
1256  return rc;
1257  }
1258  }
1259  else {
1260  // probe completions from a peer specified in the call
1261  start = proc;
1262  end = proc+1;
1263  }
1264 
1265  uint64_t offset, curr, new, left;
1266  for (i=start; i<end; i++) {
1267  // first we check the packed buffer space if necessary
1268  if (scan_packed) {
1270  curr = sync_load(&eb->curr, SYNC_RELAXED);
1271  if (pos >= 0) {
1272  offset = pos;
1273  }
1274  else {
1275  offset = curr & (eb->size - 1);
1276  left = eb->size - offset;
1277  if (left < PWC_MAX_PACKED) {
1278  new = left + curr;
1279  if (pos < 0)
1280  offset = 0;
1281  }
1282  else {
1283  new = curr;
1284  }
1285  }
1286  hdr = (photon_eb_hdr *)&(eb->data[offset]);
1287  if ((hdr->header == UINT8_MAX) && (hdr->footer == UINT8_MAX)) {
1288  photon_cid_hdr *cent = &hdr->cent;
1289  uintptr_t addr = hdr->addr;
1290  uint16_t size = hdr->length;
1291  uint64_t asize = ALIGN(EB_MSG_SIZE(size + cent->size), PWC_ALIGN);
1292  void *cidptr = (void*)(uintptr_t)hdr + sizeof(*hdr);
1293  void *payload = (void*)(uintptr_t)hdr + sizeof(*hdr) + cent->size;
1294  if (sync_cas(&eb->curr, &curr, new+asize, SYNC_RELAXED, SYNC_RELAXED)) {
1295  // now check for tail flag (or we could return to check later)
1296  volatile uint8_t *tail = (uint8_t*)(uintptr_t)((uintptr_t)hdr + asize - 1);
1297  while (*tail != UINT8_MAX)
1298  ;
1299  // check for PWC commands to process
1300  if (cent->type != PWC_CTYPE_USER) {
1301  photon_pwc_process_command(i, NULL, cent, cidptr, addr, size, payload);
1302  }
1303  else {
1304  memcpy((void*)addr, payload, size);
1305  comp->size = cent->size;
1306  comp->data = cidptr;
1307  *src = i;
1308  *flag = 1;
1309 
1310  if (cb) {
1311  cb(*comp);
1312  }
1313 
1314  // return 64 bit CIDs by value in out param, indicated by size 0
1315  if (comp->size == sizeof(comp->u64)) {
1316  comp->u64 = *(uint64_t*)comp->data;
1317  comp->size = 0;
1318  }
1319 
1320  dbg_trace("Copied message of size %u into 0x%016lx for request 0x%016lx",
1321  size, addr, comp->u64);
1322  }
1323 
1324  memset((void*)hdr, 0, asize);
1325 
1326  if (pos >= 0) {
1327  photon_pwc_handle_pooo(eb, i, offset, asize);
1328  }
1329  else {
1330  sync_store(&eb->prog, new+asize, SYNC_RELAXED);
1331  }
1332 
1333  goto exit;
1334  }
1335  }
1336  }
1337 
1338  if (scan_ledger) {
1339  // then check pwc ledger slots
1340  ledger = photon_processes[i].local_pwc_ledger;
1341  curr = sync_load(&ledger->curr, SYNC_RELAXED);
1342  if (pos >= 0) {
1343  offset = pos;
1344  }
1345  else {
1346  offset = curr & (ledger->num_entries - 1);
1347  }
1348  entry = (photonCIDRecvLedgerEntry)((uintptr_t)ledger->entries +
1349  (offset * PHOTON_CID_RECV_ENTRY_SIZE));
1350  if (entry->hdr.type != PWC_CTYPE_NULL &&
1351  sync_cas(&ledger->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
1352  uint16_t type = entry->hdr.type;
1353  comp->size = entry->hdr.size;
1354  comp->data = (void*)entry->data;
1355  *src = i;
1356 
1357  // check for internal PWC commands to process
1358  int user = (type == PWC_CTYPE_USER) ? 1 : 0;
1359  if (!user) {
1360  photon_pwc_process_command(i, &entry->rhdr, &entry->hdr,
1361  entry->data, 0, 0, NULL);
1362  }
1363 
1364  // reset entry
1365  entry->hdr.type = PWC_CTYPE_NULL;
1366 
1367  // advance progress
1368  if (rcq) {
1369  photon_pwc_handle_looo(ledger, imm, comp, user, (int*)&offset, cb, flag);
1370  }
1371  else {
1372  photon_pwc_advance_ledger(ledger, comp, user, cb, flag);
1373  }
1374 
1375  goto exit;
1376  }
1377  }
1378  }
1379 
1380  if (rcq) {
1381  log_err("Missing RCQ lookup: %d, %d", scan_packed, scan_ledger);
1382  }
1383 
1384  return PHOTON_EVENT_NONE;
1385 
1386  exit:
1387  return PHOTON_EVENT_REQFOUND;
1388 }
1389 
1390 int _photon_probe_completion(int proc, int *flag, int *remaining,
1391  photon_cid *comp, int *src,
1392  void (*cb)(photon_cid), int flags) {
1393  int i, rc = PHOTON_EVENT_NONE;
1394 
1395  *flag = 0;
1396  *src = proc;
1397 
1398  // check local CQs
1399  if (flags & PHOTON_PROBE_EVQ) {
1400  rc = photon_pwc_probe_local(proc, flag, comp, cb);
1401  if (rc == PHOTON_EVENT_REQFOUND) {
1402  goto exit;
1403  }
1404  else if (rc == PHOTON_EVENT_ERROR) {
1405  goto error_exit;
1406  }
1407  }
1408 
1409  // check recv ledger
1410  if (flags & PHOTON_PROBE_LEDGER) {
1411  // first check any fabric completions
1412  rc = photon_pwc_probe_ledger(proc, flag, comp, src, cb);
1413  if (rc == PHOTON_EVENT_REQFOUND) {
1414  goto exit;
1415  }
1416  else if (rc == PHOTON_EVENT_ERROR) {
1417  goto error_exit;
1418  }
1419 
1420 #ifdef HAVE_SHMEM
1421  // then do shmem checks if no fabric events found
1422  int rcs, *peers, pcount;
1423 
1424  rcs = __photon_shmem->get_info(NULL, 0, (void**)&peers, &pcount, PHOTON_PEERS);
1425  if (rcs != PHOTON_OK) {
1426  dbg_warn("Could not get shmem peers");
1427  }
1428 
1429  for (i=0; i<pcount; i++) {
1430  rc = photon_pwc_probe_ledger(peers[i], flag, comp, src, cb);
1431  if (rc == PHOTON_EVENT_REQFOUND) {
1432  goto exit;
1433  }
1434  else if (rc == PHOTON_EVENT_ERROR) {
1435  goto error_exit;
1436  }
1437  }
1438 #endif
1439  }
1440 
1441  // process any queued requests, only when EVQ requested
1442  if ((rc == PHOTON_EVENT_NONE) && (flags & PHOTON_PROBE_EVQ)) {
1443  uint32_t cnt = sync_load(&qcount, SYNC_RELAXED);
1444  if (cnt) {
1445  for (i=0; i<_photon_nproc; i++) {
1446  photon_pwc_process_queued_gwc(i, photon_processes[i].request_table);
1447  photon_pwc_process_queued_pwc(i, photon_processes[i].request_table);
1448  }
1449  }
1450 
1451  // also check if any collectives have completed
1452  rc = photon_coll_probe(proc, flag, comp);
1453  if (rc == PHOTON_EVENT_REQFOUND) {
1454  goto exit;
1455  }
1456  }
1457 
1458  exit:
1459  if (remaining) {
1460  *remaining = photon_count_request(proc);
1461  dbg_trace("%d requests remaining", *remaining);
1462  }
1463 
1464  return PHOTON_OK;
1465 
1466  error_exit:
1467  return PHOTON_ERROR;
1468 }
1469 
int _photon_get_with_completion(int proc, uint64_t size, photonBuffer lbuf, photonBuffer rbuf, photon_cid local, photon_cid remote, int flags, pwc_cid_type type, pwc_command cmd)
Definition: photon_pwc.c:854
photonLedger remote_pwc_ledger
int __photon_get_event(int proc, photon_rid *id)
Definition: photon_event.c:87
Convenience pointer type for the buffer structure.
Definition: photon.h:105
int __photon_get_revent(int proc, photon_rid *id, uint64_t *imm)
Definition: photon_event.c:139
uint32_t size
Size 0 indicates u64 field is used, otherwise data pointer is used.
Definition: photon.h:83
#define RDMA_FLAG_NIL
photonRequest photon_lookup_request(photon_rid rid)
#define PHOTON_EVENT_REQCOMP
Definition: photon_event.h:26
#define PHOTON_ORDER_NONE
Do not assume anything about the ordering of completion IDs.
Definition: photon_config.h:28
volatile uint8_t header
Definition: photon_pwc.h:47
The Photon completion ID used by the PWC API.
Definition: photon.h:78
photonLedger local_pwc_ledger
pwc_cid_type
Definition: photon_pwc.h:29
photonBufferHandle shared_storage
photonRequest photon_pwc_pop_req(int proc)
Definition: photon_pwc.c:755
int photon_count_request(int proc)
int _LEDGER_SIZE
Definition: libphoton.c:53
uintptr_t addr
Definition: photon_pwc.h:49
#define PWC_ALIGN
Definition: photon_pwc.h:25
int __photon_handle_cq_event(photonRequest req, photon_rid cookie, photonRequest *rreq)
Definition: photon_event.c:173
struct photon_buffer_handle_t * photonBufferHandle
int photon_pwc_register_ctype_handler(pwc_cid_type type, void *handler)
Definition: photon_pwc.c:743
photonRequest photon_setup_request_direct(photonBuffer lbuf, photonBuffer rbuf, uint64_t size, int proc, int events)
int _photon_nproc
Definition: libphoton.c:55
int photon_coll_probe(int proc, int *flag, photon_cid *comp)
Definition: photon_coll.c:73
photonBackend __photon_shmem
Definition: libphoton.c:49
photonRequestTable request_table
struct photon_cid_ledger_entry_t * photonCIDLedgerEntry
struct photon_req_table_t * photonRequestTable
uint64_t photon_rid
The Photon request ID.
Definition: photon.h:75
photonBackend __photon_fabric
Definition: libphoton.c:50
#define ENCODE_RCQ_32(t, l, f, c, p)
Definition: photon_event.h:39
#define PHOTON_ANY_SOURCE
RNDV and PWC flag: return completions from any source.
Definition: photon.h:54
photonEagerBuf local_pwc_buf
#define PHOTON_EFLAG_LEDG
Definition: photon_event.h:30
int _photon_pbsize
Definition: libphoton.c:59
volatile uint8_t footer
Definition: photon_pwc.h:51
#define REQUEST_FLAG_1PWC
pwc_command
Definition: photon_pwc.h:37
int _photon_upsize
Definition: libphoton.c:61
#define PHOTON_EFLAG_ONE
Definition: photon_event.h:33
struct photon_buffer_priv_t priv
THe associated private buffer information.
Definition: photon.h:108
#define REQUEST_COOK_GPWC
photon_cid_hdr cent
Definition: photon_pwc.h:48
#define PHOTON_EVENT_NOTIMPL
Definition: photon_event.h:28
int _photon_myrank
Definition: libphoton.c:54
#define RDMA_FLAG_WITH_IMM
#define EB_MSG_SIZE(s)
Definition: photon_pwc.h:54
volatile uint16_t type
#define PHOTON_REQ_PWC_NO_RCE
PWC flag: do not send a remote completion ID.
Definition: photon.h:44
photonBackend backend
int photon_rdma_eager_buf_get_offset(int proc, photonEagerBuf buf, int size, int lim)
uint64_t key0
Definition: photon.h:99
#define DECODE_RCQ_32_CURR(v)
Definition: photon_event.h:45
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define NULL_REQUEST
#define DECODE_RCQ_32_TYPE(v)
Definition: photon_event.h:48
#define PHOTON_EVENT_NONE
Definition: photon_event.h:25
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
int photon_free_request(photonRequest req)
void * data
Pointer to user-defined completion ID data.
Definition: photon.h:81
uint64_t size
The size of the buffer in bytes.
Definition: photon.h:107
#define PHOTON_ETYPE_DATA
Definition: photon_event.h:36
#define PHOTON_REQ_PWC_NO_LCE
PWC flag: probe will not return a local completion ID.
Definition: photon.h:43
photonEagerBuf remote_pwc_buf
#define PHOTON_ETYPE_NTFY
Definition: photon_event.h:37
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
int photon_rdma_ledger_get_next(int proc, photonLedger l)
int _photon_put_with_completion(int proc, uint64_t size, photonBuffer lbuf, photonBuffer rbuf, photon_cid local, photon_cid remote, int flags, pwc_cid_type type, pwc_command cmd)
Definition: photon_pwc.c:759
#define REQUEST_FLAG_CMD
#define PHOTON_EFLAG_PACK
Definition: photon_event.h:31
int buffertable_find_containing(void *addr, uint64_t size, photonBufferHandle *result)
#define REQUEST_FLAG_NO_LCE
uint64_t u64
Unsigned 64b representation of the ID.
Definition: photon.h:80
#define PHOTON_ERROR_RESOURCE
Error code, resource not available.
Definition: photon.h:33
int _photon_idsize
Definition: libphoton.c:63
struct photon_rdma_ledger_t * photonLedger
#define REQUEST_FLAG_NO_RCE
#define REQUEST_FLAG_ROP
#define PWC_MAX_PACKED
Definition: photon_pwc.h:55
#define DECODE_RCQ_32_LEAD(v)
Definition: photon_event.h:47
struct photon_cid_recv_ledger_entry_t * photonCIDRecvLedgerEntry
#define PHOTON_EFLAG_TWO
Definition: photon_event.h:34
#define PHOTON_EVENT_ERROR
Definition: photon_event.h:24
#define REQUEST_OP_PWC
int photon_pwc_init(photonConfig cfg)
Definition: photon_pwc.c:716
uint16_t length
Definition: photon_pwc.h:50
#define PHOTON_CID_RECV_ENTRY_SIZE
#define PHOTON_PROBE_LEDGER
PWC probe flag: return only ledger completion IDs.
Definition: photon.h:49
int photon_pwc_add_req(photonRequest req)
Definition: photon_pwc.c:749
#define DECODE_RCQ_32_FLAG(v)
Definition: photon_event.h:46
int _photon_probe_completion(int proc, int *flag, int *remaining, photon_cid *comp, int *src, void(*cb)(photon_cid), int flags)
Definition: photon_pwc.c:1390
#define PHOTON_EVENT_REQFOUND
Definition: photon_event.h:27
#define PHOTON_PROBE_EVQ
PWC probe flag: return only local (event queue) IDs.
Definition: photon.h:48
#define PHOTON_EVENT_OK
Definition: photon_event.h:23
uintptr_t addr
The base address of the buffer.
Definition: photon.h:106
#define DECODE_RCQ_32_PROC(v)
Definition: photon_event.h:44
#define REQUEST_FLAG_2PWC
struct photon_rdma_eager_buf_t * photonEagerBuf