photon  1.1
photon_backend.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <stdbool.h>
17 #include <unistd.h>
18 #include <inttypes.h>
19 #include <pthread.h>
20 #include <arpa/inet.h>
21 
22 #include "libphoton.h"
23 #include "photon_backend.h"
24 #include "photon_forwarder.h"
25 #include "photon_buffertable.h"
26 #include "photon_exchange.h"
27 #include "photon_pwc.h"
28 #include "photon_coll.h"
29 #include "photon_request.h"
30 #include "photon_event.h"
31 
34 
35 SLIST_HEAD(pendingmemregs, photon_mem_register_req) pending_mem_register_list;
36 
37 static photonMsgBuf sendbuf;
38 static photonMsgBuf recvbuf;
39 
40 int _photon_initialized() {
42  return __photon_fabric->initialized();
43  else if (__photon_shmem)
44  return __photon_shmem->initialized();
45  else
46  return PHOTON_ERROR_NOINIT;
47 }
48 
49 int _photon_init(photonConfig cfg, ProcessInfo *info, photonBufferHandle ss) {
50  int i, rc;
51  char *buf;
52  int bufsize;
53  int info_ledger_size, fin_ledger_size, pwc_ledger_size, eager_ledger_size;
54  int eager_bufsize, pwc_bufsize;
55 
56  srand48(getpid() * time(NULL));
57 
58  dbg_trace("(nproc %d, rank %d)",_photon_nproc, _photon_myrank);
59  one_debug("num ledgers:\t\t%d", _LEDGER_SIZE);
60  one_debug("num CQs:\t\t%d", cfg->cap.num_cq);
61  one_debug("eager buf size:\t\t%d", _photon_ebsize);
62  one_debug("small msg size:\t\t%d", _photon_smsize);
63  one_debug("pwc buf size:\t\t%d", _photon_pbsize);
64  one_debug("small pwc size:\t\t%d", _photon_upsize);
65  one_debug("max cid size:\t\t%d", _photon_idsize);
66  one_debug("init req per rank:\t%d", cfg->cap.default_rd);
67  one_debug("completion order:\t%s", PHOTON_ORDER_TO_STRING[cfg->attr.comp_order]);
68 
69  if (buffertable_init(193)) {
70  log_err("Failed to allocate buffer table");
71  goto error_exit;
72  }
73  dbg_trace("Allocated buffertable");
74 
75  photon_processes = (ProcessInfo *) malloc(sizeof(ProcessInfo) * (_photon_nproc + _photon_nforw));
76  if (!photon_processes) {
77  log_err("Could not allocate process information");
78  goto error_exit_bt;
79  }
80 
81  // Set it to zero, so that we know if it ever got initialized
82  memset(photon_processes, 0, sizeof(ProcessInfo) * (_photon_nproc + _photon_nforw));
83 
84  dbg_trace("Allocated and cleared process info");
85 
86  // initialize the request tables
87  if (photon_request_init(cfg) != PHOTON_OK) {
88  goto error_exit_bt;
89  }
90 
91  // initialize PWC
92  if (photon_pwc_init(cfg) != PHOTON_OK) {
93  goto error_exit_bt;
94  }
95 
96  dbg_trace("Allocated request structures");
97 
98  // Ledgers are x2 cause we need a local and a remote copy of each ledger.
99  // Info ledger has an additional x2 cause we need "send-info" and "receive-info" ledgers.
100  info_ledger_size = 2 * 2 * PHOTON_NP_INFO_SIZE;
101  fin_ledger_size = 2 * PHOTON_NP_LEDG_SIZE;
102  pwc_ledger_size = PHOTON_NP_RPEDG_SIZE + PHOTON_NP_PEDG_SIZE;
103  eager_ledger_size = 2 * PHOTON_NP_LEDG_SIZE;
104  eager_bufsize = 2 * PHOTON_NP_EBUF_SIZE;
105  pwc_bufsize = 2 * PHOTON_NP_PBUF_SIZE;
106 
107  bufsize = info_ledger_size + fin_ledger_size + pwc_ledger_size + eager_ledger_size;
108  bufsize += (eager_bufsize + pwc_bufsize);
109 
110  rc = posix_memalign((void**)&buf, getpagesize(), bufsize);
111  if (rc || !buf) {
112  log_err("Couldn't allocate ledgers");
113  goto error_exit_crb;
114  }
115 
116  one_debug("shared buffer size:\t%d", bufsize);
117 
118  if (photon_setup_ri_ledger(photon_processes, PHOTON_LRI_PTR(buf), _LEDGER_SIZE) != 0) {
119  log_err("couldn't setup snd/rcv info ledgers");
120  goto error_exit_buf;
121  }
122 
123  if (photon_setup_fin_ledger(photon_processes, PHOTON_LF_PTR(buf), _LEDGER_SIZE) != 0) {
124  log_err("couldn't setup send ledgers");
125  goto error_exit_buf;
126  }
127 
128  if (photon_setup_pwc_ledger(photon_processes, PHOTON_LP_PTR(buf), _LEDGER_SIZE) != 0) {
129  log_err("couldn't setup send ledgers");
130  goto error_exit_buf;
131  }
132 
133  if (photon_setup_eager_ledger(photon_processes, PHOTON_LE_PTR(buf), _LEDGER_SIZE) != 0) {
134  log_err("couldn't setup eager ledgers");
135  goto error_exit_buf;
136  }
137 
138  if (photon_setup_eager_buf(photon_processes, PHOTON_LEB_PTR(buf), _photon_ebsize) != 0) {
139  log_err("couldn't setup eager buffers");
140  goto error_exit_buf;
141  }
142 
143  if (photon_setup_pwc_buf(photon_processes, PHOTON_LPB_PTR(buf), _photon_pbsize) != 0) {
144  log_err("couldn't setup pwc eager buffers");
145  goto error_exit_buf;
146  }
147 
149  if (!shared_storage) {
150  log_err("Couldn't register shared storage");
151  goto error_exit_buf;
152  }
153 
154  if (__photon_shmem) {
155  // initialize the shared memory backend
156  rc = __photon_shmem->init(cfg, photon_processes);
157  if (rc != PHOTON_OK) {
158  log_err("Could not initialize photon shared memory");
159  goto error_exit_ss;
160  }
161  }
162 
163  if (__photon_fabric) {
164  // initialize the fabric backend
165  rc = __photon_fabric->init(cfg, photon_processes);
166  if (rc != PHOTON_OK) {
167  log_err("Could not initialize photon fabric backend");
168  goto error_exit_ss;
169  }
170 
171  // XXX: move this to verbs backend, why is this even here!
172  // allocate buffers for UD send/recv operations (after backend initializes)
173  uint64_t msgbuf_size, p_size;
174  int p_offset, p_hsize;
175  if ((cfg->backend == PHOTON_BACKEND_VERBS) && cfg->ibv.use_ud) {
176  // we need to ask the backend about the max msg size it can support for UD
177  int *mtu, size;
178  if (__photon_fabric->get_info(photon_processes, PHOTON_ANY_SOURCE, (void**)&mtu, &size, PHOTON_MTU)) {
179  dbg_err("Could not get mtu for UD service");
180  goto error_exit_ss;
181  }
182  // gross hack, should ask backend again...
183  p_offset = 40;
184  p_hsize = sizeof(photon_ud_hdr);
185  p_size = p_offset + *mtu - 84;
186 
187  //else {
188  // p_offset = 0;
189  // p_hsize = 0;
190  // p_size = p_offset + _photon_smsize;
191 
192  dbg_trace("sr partition size: %lu", p_size);
193 
194  // create enough space to accomodate every rank sending _LEDGER_SIZE messages
195  msgbuf_size = _LEDGER_SIZE * p_size * _photon_nproc;
196 
197  sendbuf = photon_msgbuffer_new(msgbuf_size, p_size, p_offset, p_hsize);
198  if (!sendbuf) {
199  dbg_err("could not create send message buffer");
200  goto error_exit_ss;
201  }
203 
204  recvbuf = photon_msgbuffer_new(msgbuf_size, p_size, p_offset, p_hsize);
205  if (!recvbuf) {
206  dbg_err("could not create recv message buffer");
207  goto error_exit_sb;
208  }
210 
211  // pre-post the receive buffers when UD service is requested
212  for (i = 0; i < recvbuf->p_count; i++) {
213  __photon_fabric->rdma_recv(NULL, (uintptr_t)recvbuf->entries[i].base,
214  recvbuf->p_size, &recvbuf->db->bint.buf,
215  (( (uint64_t)REQUEST_COOK_RECV) << 32) | i, 0);
216  }
217  }
218  }
219 
220  // should never trigger this assert if all is well
221  assert(__photon_shmem || __photon_fabric);
222 
223  // if backend not set by init, default to current enabled fabric
224  for (i=0; i<_photon_nproc; i++) {
225  if (photon_processes[i].backend == NULL) {
226  photon_processes[i].backend = __photon_fabric;
227  }
228  }
229 
230  // we can safely register our shared ledger space once the
231  // fabric and shmem modules are initialized
233  log_err("Could not register local buffer for the ledger entries");
234  goto error_exit;
235  }
236 
238 
239  // exchange shared ledger addresses
240  if (photon_exchange_ledgers(photon_processes, LEDGER_ALL) != PHOTON_OK) {
241  log_err("Could not exchange ledgers");
242  goto error_exit;
243  }
244 
245  // initialize the collective interface
246  if (photon_coll_init(cfg) != PHOTON_OK) {
247  goto error_exit_bt;
248  }
249 
250  // register any buffers that were requested before init
251  while( !SLIST_EMPTY(&pending_mem_register_list) ) {
252  struct photon_mem_register_req *mem_reg_req;
253  dbg_trace("registering buffer in queue");
254  mem_reg_req = SLIST_FIRST(&pending_mem_register_list);
255  SLIST_REMOVE_HEAD(&pending_mem_register_list, list);
256  _photon_register_buffer(mem_reg_req->buffer, mem_reg_req->buffer_size);
257  }
258 
259  dbg_trace("ended successfully =============");
260 
261  return PHOTON_OK;
262 
263  error_exit_sb:
264  photon_msgbuffer_free(sendbuf);
265  error_exit_ss:
267  error_exit_buf:
268  if (buf)
269  free(buf);
270  error_exit_crb:
271  free(photon_processes);
272  error_exit_bt:
274  error_exit:
275 
276  return PHOTON_ERROR;
277 }
278 
279 int _photon_cancel(photon_rid request, int flags) {
280  photonRequest req;
281 
282  if (flags & PHOTON_REQ_SHUTDOWN) {
283 
284  }
285  else {
286  if ((req = photon_lookup_request(request)) != NULL) {
287  dbg_err("Could not find request to cancel: 0x%016lx", request);
288  return PHOTON_ERROR;
289  }
290  while (!(req->flags & REQUEST_FLAG_LDONE))
292  photon_free_request(req);
293  }
294 
295  return PHOTON_OK;
296 }
297 
299  int rc;
300 
302  free((void*)shared_storage->bint.buf.addr);
304 
305  if (__photon_shmem) {
306  rc = __photon_shmem->finalize();
307  if (rc != PHOTON_OK) {
308  log_err("Could not finalize shmem");
309  return PHOTON_ERROR;
310  }
311  }
312 
313  if (__photon_fabric) {
314  rc = __photon_fabric->finalize();
315  if (rc != PHOTON_OK) {
316  log_err("Could not finalize fabric");
317  return PHOTON_ERROR;
318  }
319  }
320 
321  return PHOTON_OK;
322 }
323 
324 int _photon_register_buffer(void *addr, uint64_t size) {
325  static int first_time = 1;
327 
328  dbg_trace("(%p, %lu)", addr, size);
329 
330  if (_photon_initialized() != PHOTON_OK) {
331  struct photon_mem_register_req *mem_reg_req;
332  if( first_time ) {
333  SLIST_INIT(&pending_mem_register_list);
334  first_time = 0;
335  }
336  mem_reg_req = malloc( sizeof(struct photon_mem_register_req) );
337  mem_reg_req->buffer = addr;
338  mem_reg_req->buffer_size = size;
339 
340  SLIST_INSERT_HEAD(&pending_mem_register_list, mem_reg_req, list);
341  dbg_trace("called before init, queueing buffer info");
342  goto normal_exit;
343  }
344 
345  if (buffertable_find_exact(addr, size, &db) == 0) {
346  dbg_trace("we had an existing buffer, reusing it");
347  db->ref_count++;
348  goto normal_exit;
349  }
350 
351  db = photon_buffer_create(addr, size, BUFFER_FLAG_NIL);
352  if (!db) {
353  log_err("could not create photon buffer");
354  goto error_exit;
355  }
356 
357  dbg_trace("created buffer: %p", db);
358 
359  if (photon_buffer_register(db, BUFFER_FLAG_NIL) != 0) {
360  log_err("Couldn't register buffer of size %lu at %p", size, addr);
361  goto error_exit_db;
362  }
363 
364  dbg_trace("registered buffer");
365 
366  if (buffertable_insert(db) != 0) {
367  goto error_exit_db;
368  }
369 
370  dbg_trace("added buffer to table");
371 
372 normal_exit:
373  return PHOTON_OK;
374 error_exit_db:
375  photon_buffer_free(db);
376 error_exit:
377  return PHOTON_ERROR;
378 }
379 
380 int _photon_unregister_buffer(void *addr, uint64_t size) {
382 
383  dbg_trace("(%p, %llu)", addr, size);
384 
385  CHECK_INIT();
386 
387  if (buffertable_find_exact(addr, size, &db) != 0) {
388  dbg_err("no such buffer is registered at %p", addr);
389  goto error_exit;
390  }
391 
392  if (--(db->ref_count) == 0) {
393  if (photon_buffer_unregister(db) != 0) {
394  goto error_exit;
395  }
396  buffertable_remove(db);
397  photon_buffer_free(db);
398  }
399 
400  return PHOTON_OK;
401 
402 error_exit:
403  return PHOTON_ERROR;
404 }
405 
407 // photon_test() is a nonblocking operation that checks the event queue to see if
408 // the event associated with the "request" parameter has completed. It returns:
409 // 0 if the event associated with "request" was in the queue and was successfully poped.
410 // 1 if "request" was not in the request tables. This is not an error if photon_test()
411 // is called in a loop and is called multiple times for each request.
412 // -1 if an error occured.
413 //
414 // When photon_test() returns zero (success) the "flag" parameter has the value:
415 // 0 if the event that was poped does not correspond to "request", or if none of the operations completed.
416 // 1 if the event that was poped does correspond to "request".
417 //
418 // When photon_test() returns 0 and flag==0 the "status" structure is also filled
419 //
420 // Regardless of the return value and the value of "flag", the parameter "type"
421 // will be set to 0 (zero) when the request is of type event and 1 (one) when the
422 // request is of type ledger. type is set to 2 when the request was a send/recv
423 int _photon_test(photon_rid request, int *flag, int *type, photonStatus status) {
424  photonRequest req;
425  int ret_val;
426 
427  dbg_trace("(0x%016lx)", request);
428 
429  req = photon_lookup_request(request);
430  if (req == NULL) {
431  dbg_warn("Request (id=0x%016lx) could not be found", request);
432  // Unlike photon_wait(), we might call photon_test() multiple times on a request,
433  // e.g., in an unguarded loop. flag==-1 will signify that the operation is
434  // not pending. This means, it might be completed, it might have never been
435  // issued. It's up to the application to guarantee correctness, by keeping
436  // track, of what's going on. Unless you know what you are doing, consider
437  // (flag==-1 && return_value==1) to be an error case.
438  dbg_trace("returning 1, flag:-1");
439  *flag = -1;
440  return 1;
441  }
442 
443  *flag = 0;
444 
445  switch (req->type) {
446  case LEDGER:
447  {
448  if( type != NULL ) *type = 1;
449  ret_val = __photon_nbpop_ledger(req);
450  }
451  break;
452  case SENDRECV:
453  {
454  if( type != NULL ) *type = 2;
455  ret_val = __photon_nbpop_sr(req);
456  }
457  break;
458  default:
459  {
460  if( type != NULL ) *type = 0;
461  ret_val = __photon_nbpop_event(req);
462  }
463  break;
464  }
465 
466  switch (ret_val) {
467  case PHOTON_EVENT_REQCOMP:
468  {
469  *flag = 1;
470  status->src_addr.global.proc_id = req->proc;
471  status->tag = req->tag;
472  status->size = req->size;
473  status->count = 1;
474  status->error = 0;
475  if (!(req->flags & REQUEST_FLAG_WFIN))
476  photon_free_request(req);
477  dbg_trace("returning 0, flag:1");
478  return 0;
479  }
480  break;
482  case PHOTON_EVENT_NONE:
483  case PHOTON_EVENT_OK:
484  {
485  dbg_trace("returning 0, flag:0");
486  *flag = 0;
487  return 0;
488  }
489  break;
490  case PHOTON_EVENT_ERROR:
491  {
492  dbg_trace("returning -1, flag:0");
493  *flag = 0;
494  return -1;
495  }
496  break;
497  default:
498  *flag = 0;
499  dbg_warn("Reached default case");
500  return -1;
501  break;
502  }
503 
504  return -1;
505 }
506 
507 int _photon_wait(photon_rid request) {
508  photonRequest req;
509  int rc;
510 
511  dbg_trace("(0x%016lx)", request);
512 
513  req = photon_lookup_request(request);
514  if (req == NULL) {
515  log_err("Wrong request value, operation not in table");
516  return PHOTON_ERROR;
517  }
518 
519  if (req->state == REQUEST_FREE) {
520  dbg_warn("Request 0x%016lx is already free!", req->id);
521  return PHOTON_OK;
522  }
523 
524  if (req->type == LEDGER)
525  rc =__photon_wait_ledger(req);
526  else
527  rc = __photon_wait_event(req);
528 
529  if (rc == PHOTON_OK) {
530  photon_free_request(req);
531  return PHOTON_OK;
532  }
533 
534  return PHOTON_ERROR;
535 }
536 
537 int _photon_send(photonAddr addr, void *ptr, uint64_t size, int flags, photon_rid *request) {
538  return PHOTON_OK;
539 }
540 
541 int _photon_recv(photon_rid request, void *ptr, uint64_t size, int flags) {
542  return PHOTON_OK;
543 }
544 
545 int _photon_post_recv_buffer_rdma(int proc, void *ptr, uint64_t size, int tag, photon_rid *request) {
546  photonBackend be = photon_processes[proc].backend;
548  photonRequest req;
549  photonRILedgerEntry entry;
550  uintptr_t rmt_addr;
551  int curr, rc;
552 
553  dbg_trace("(%d, %p, %lu, %d, %p)", proc, ptr, size, tag, request);
554 
555  if (buffertable_find_containing( (void *) ptr, size, &db) != 0) {
556  log_err("Requested recv from ptr not in table");
557  goto error_exit;
558  }
559 
560  req = photon_get_request(proc);
561  if (!req) {
562  log_err("Could not get request descriptor for proc %d", proc);
563  goto error_exit;
564  }
565 
566  // photon_post_recv_buffer_rdma() initiates a receiver initiated handshake. For this reason,
567  // we don't care when the function is completed, but rather when the transfer associated with
568  // this handshake is completed. This will be reflected in the LEDGER by the corresponding
569  // photon_send_FIN() posted by the sender.
570  req->state = REQUEST_PENDING;
571  req->op = REQUEST_OP_RECVBUF;
572  req->type = LEDGER;
573  req->proc = proc;
574  req->tag = tag;
575  req->size = size;
576  req->rattr.events = 1;
577  req->rattr.cookie = req->id;
578 
579  if (request != NULL) {
580  *request = req->id;
581  }
582  else {
583  log_warn("request == NULL, could not return request ID: 0x%016lx", req->id);
584  }
585 
586  /* proc == -1 means ANY_SOURCE. In this case all potential senders must post a send request
587  which will write into our snd_info ledger entries such that:
588  rkey == 0
589  addr == (uintptr_t)0 */
590  if( proc == PHOTON_ANY_SOURCE ) {
591  proc = photon_wait_send_request_rdma(tag);
592  }
593 
594  curr = photon_ri_ledger_get_next(proc, photon_processes[proc].remote_rcv_info_ledger);
595  if (curr < 0) {
596  if (curr == -2) {
597  return PHOTON_ERROR_RESOURCE;
598  }
599  goto error_exit;
600  }
601  dbg_trace("New curr (proc=%d): %u", proc, curr);
602  entry = &photon_processes[proc].remote_rcv_info_ledger->entries[curr];
603 
604  rmt_addr = photon_processes[proc].remote_rcv_info_ledger->remote.addr;
605  rmt_addr += curr * sizeof(*entry);
606 
607  /* fill in what we're going to transfer */
608  entry->header = 1;
609  entry->request = req->id;
610  entry->tag = tag;
611  entry->addr = (uintptr_t)ptr;
612  entry->size = size;
613  entry->priv = db->bint.buf.priv;
614  entry->footer = 1;
615 
616  dbg_trace("Post recv");
617  dbg_trace("Request: 0x%016lx", entry->request);
618  dbg_trace("Address: %p", (void *)entry->addr);
619  dbg_trace("RAddress: %p", (void *)rmt_addr);
620  dbg_trace("Size: %lu", entry->size);
621  dbg_trace("Tag: %d", entry->tag);
622  dbg_trace("Keys: 0x%016lx / 0x%016lx", entry->priv.key0, entry->priv.key1);
623 
624  {
625  rc = be->rdma_put(proc, (uintptr_t)entry, rmt_addr, sizeof(*entry), &(shared_storage->bint.buf),
626  &(photon_processes[proc].remote_rcv_info_ledger->remote), NULL_REQUEST,
627  0, RDMA_FLAG_NIL);
628  if (rc != PHOTON_OK) {
629  dbg_err("RDMA PUT failed for 0x%016lx", req->id);
630  goto error_exit;
631  }
632  }
633 
634  return PHOTON_OK;
635 
636 error_exit:
637  if (request != NULL) {
638  *request = NULL_REQUEST;
639  }
640  return PHOTON_ERROR;
641 }
642 
643 int _photon_try_eager(int proc, void *ptr, uint64_t size, int tag,
644  photon_rid *request, photonBufferHandle db) {
645  if (size <= _photon_smsize) {
646  photonBackend be = photon_processes[proc].backend;
647  photonRequest req;
648  uintptr_t rmt_addr, eager_addr;
649  photon_rid eager_cookie;
650  photonEagerBuf eb;
651  photonLedger l;
652  photonRDMALedgerEntry entry, entries;
653  int offset, rc, curr;
654 
655  eb = photon_processes[proc].remote_eager_buf;
656  offset = photon_rdma_eager_buf_get_offset(proc, eb, size, size);
657  if (offset < 0) {
658  if (offset == -2) {
659  dbg_trace("Exceeding known receiver eager buf progress!");
660  return PHOTON_ERROR_RESOURCE;
661  }
662  else {
663  goto error_exit;
664  }
665  }
666 
667  l = photon_processes[proc].remote_eager_ledger;
668  curr = photon_rdma_ledger_get_next(proc, l);
669  if (curr < 0) {
670  if (offset == -2) {
671  dbg_trace("Exceeding known receiver eager ledger progress!");
672  return PHOTON_ERROR_RESOURCE;
673  }
674  goto error_exit;
675  }
676 
677  req = photon_get_request(proc);
678  if (!req) {
679  log_err("Could not get request descriptor for proc %d", proc);
680  goto error_exit;
681  }
682  // photon_post_send_buffer_rdma() initiates a sender initiated handshake.For this reason,
683  // we don't care when the function is completed, but rather when the transfer associated with
684  // this handshake is completed. This will be reflected in the LEDGER by the corresponding
685  // photon_send_FIN() posted by the receiver.
686  req->state = REQUEST_PENDING;
687  req->op = REQUEST_OP_SENDBUF;
688  req->flags = REQUEST_FLAG_EAGER;
689  req->type = LEDGER;
690  req->proc = proc;
691  req->tag = tag;
692  req->size = size;
693  req->rattr.events = 1;
694  req->rattr.cookie = req->id;
695 
696  if (request != NULL) {
697  *request = req->id;
698  }
699  else {
700  log_warn("request == NULL, could not return request ID: 0x%016lx", req->id);
701  }
702 
703  eager_addr = (uintptr_t)eb->remote.addr + offset;
704  eager_cookie = (( (uint64_t)REQUEST_COOK_EAGER)<<32) | (req->id<<32)>>32;
705 
706  dbg_trace("EAGER PUT of size %lu to addr: 0x%016lx", size, eager_addr);
707 
708  rc = be->rdma_put(proc, (uintptr_t)ptr, eager_addr, size, &(db->bint.buf),
709  &eb->remote, eager_cookie, 0, RDMA_FLAG_NIL);
710 
711  if (rc != PHOTON_OK) {
712  dbg_err("RDMA EAGER PUT failed for 0x%016lx", eager_cookie);
713  goto error_exit;
714  }
715 
716  dbg_trace("new eager curr == %d", curr);
717  rmt_addr = l->remote.addr + (curr * sizeof(*entry));
718 
719  entries = l->entries;
720  entry = &entries[curr];
721  // encode the eager size and request id in the eager ledger
722  entry->request = (size<<32) | (req->id<<32>>32);
723 
724  dbg_trace("Updating remote eager ledger address: 0x%016lx, %lu", rmt_addr, sizeof(*entry));
725 
726  rc = be->rdma_put(proc, (uintptr_t)entry, rmt_addr, sizeof(*entry), &(shared_storage->bint.buf),
727  &(photon_processes[proc].remote_eager_ledger->remote), NULL_REQUEST,
728  0, RDMA_FLAG_NIL);
729  if (rc != PHOTON_OK) {
730  dbg_err("RDMA PUT failed for 0x%016lx", req->id);
731  goto error_exit;
732  }
733 
734  return PHOTON_OK;
735  }
736  else {
737  return PHOTON_ERROR_RESOURCE;
738  }
739 
740  error_exit:
741  return PHOTON_ERROR;
742 }
743 
744 int _photon_try_rndv(int proc, void *ptr, uint64_t size, int tag,
745  photon_rid *request, photonBufferHandle db) {
746  int curr, rc;
747  uintptr_t rmt_addr;
748  photonRequest req;
749  photonRILedgerEntry entry;
750  photonBackend be = photon_processes[proc].backend;
751 
752  curr = photon_ri_ledger_get_next(proc, photon_processes[proc].remote_snd_info_ledger);
753  if (curr < 0) {
754  if (curr == -2) {
755  dbg_trace("Exceeding known receiver snd_info progress!");
756  return PHOTON_ERROR_RESOURCE;
757  }
758  goto error_exit;
759  }
760  dbg_trace("new curr == %d", curr);
761 
762  req = photon_get_request(proc);
763  if (!req) {
764  log_err("Could not get request descriptor for proc %d", proc);
765  goto error_exit;
766  }
767 
768  req->state = REQUEST_PENDING;
769  req->op = REQUEST_OP_SENDBUF;
770  req->flags = REQUEST_FLAG_NIL;
771  req->type = LEDGER;
772  req->proc = proc;
773  req->tag = tag;
774  req->size = size;
775  req->rattr.events = 1;
776  req->rattr.cookie = req->id;
777 
778  if (request != NULL) {
779  *request = req->id;
780  }
781  else {
782  log_warn("request == NULL, could not return request ID: 0x%016lx", req->id);
783  }
784 
785  rmt_addr = photon_processes[proc].remote_snd_info_ledger->remote.addr;
786  rmt_addr += curr * sizeof(*entry);
787  entry = &photon_processes[proc].remote_snd_info_ledger->entries[curr];
788 
789  // fill in what we're going to transfer
790  entry->header = 1;
791  entry->request = req->id;
792  entry->tag = tag;
793  entry->addr = (uintptr_t)ptr;
794  entry->size = size;
795  entry->priv = db->bint.buf.priv;
796  entry->footer = 1;
797  entry->flags = REQUEST_FLAG_NIL;
798 
799  dbg_trace("Post send request");
800  dbg_trace("Request: 0x%016lx", entry->request);
801  dbg_trace("Addr: %p", (void *)entry->addr);
802  dbg_trace("Size: %lu", entry->size);
803  dbg_trace("Tag: %d", entry->tag);
804  dbg_trace("Keys: 0x%016lx / 0x%016lx", entry->priv.key0, entry->priv.key1);
805  dbg_trace("Updating remote ledger address: 0x%016lx, %lu", rmt_addr, sizeof(*entry));
806 
807  rc = be->rdma_put(proc, (uintptr_t)entry, rmt_addr, sizeof(*entry), &(shared_storage->bint.buf),
808  &(photon_processes[proc].remote_snd_info_ledger->remote), NULL_REQUEST,
809  0, RDMA_FLAG_NIL);
810  if (rc != PHOTON_OK) {
811  dbg_err("RDMA PUT failed for 0x%016lx", req->id);
812  goto error_exit;
813  }
814 
815  return PHOTON_OK;
816 
817  error_exit:
818  return PHOTON_ERROR;
819 }
820 
821 int _photon_post_send_buffer_rdma(int proc, void *ptr, uint64_t size, int tag, photon_rid *request) {
823 
824  dbg_trace("(%d, %p, %lu, %d, %p)", proc, ptr, size, tag, request);
825 
826  if (buffertable_find_containing( (void*)ptr, size, &db) != 0) {
827  log_err("Requested post of send buffer for ptr not in table");
828  goto error_exit;
829  }
830 
831  int rc = _photon_try_eager(proc, ptr, size, tag, request, db);
832  if (rc == PHOTON_ERROR_RESOURCE) {
833  rc = _photon_try_rndv(proc, ptr, size, tag, request, db);
834  }
835 
836  if (rc != PHOTON_OK) {
837  return rc;
838  }
839 
840  return PHOTON_OK;
841 
842  error_exit:
843  if (request != NULL) {
844  *request = NULL_REQUEST;
845  }
846  return PHOTON_ERROR;
847 }
848 
849 int _photon_post_send_request_rdma(int proc, uint64_t size, int tag, photon_rid *request) {
850  photonBackend be = photon_processes[proc].backend;
851  photonRequest req;
852  photonRILedgerEntry entry;
853  int curr, rc;
854 
855  dbg_trace("(%d, %lu, %d, %p)", proc, size, tag, request);
856 
857  req = photon_get_request(proc);
858  if (!req) {
859  log_err("Could not get request descriptor for proc %d", proc);
860  goto error_exit;
861  }
862  // photon_post_send_request_rdma() causes an RDMA transfer, but its own completion is
863  // communicated to the task that posts it through a DTO completion event. This
864  // function informs the receiver about an upcoming send, it does NOT initiate
865  // a data transfer handshake and that's why it's not a LEDGER event.
866  req->state = REQUEST_PENDING;
867  req->op = REQUEST_OP_SENDREQ;
868  req->type = EVQUEUE;
869  req->proc = proc;
870  req->tag = tag;
871  req->size = size;
872  req->rattr.events = 1;
873  req->rattr.cookie = req->id;
874 
875  if (request != NULL) {
876  *request = req->id;
877  }
878  else {
879  log_warn("request == NULL, could not return request ID: 0x%016lx", req->id);
880  }
881 
882  curr = photon_ri_ledger_get_next(proc, photon_processes[proc].remote_snd_info_ledger);
883  if (curr < 0) {
884  if (curr == -2) {
885  return PHOTON_ERROR_RESOURCE;
886  }
887  goto error_exit;
888  }
889  dbg_trace("new curr == %d", curr);
890 
891  entry = &photon_processes[proc].remote_snd_info_ledger->entries[curr];
892  // fill in what we're going to transfer
893  // this is just an intent to transfer, no real info
894  entry->header = 1;
895  entry->request = req->id;
896  entry->tag = tag;
897  entry->addr = (uintptr_t)0;
898  entry->size = size;
899  entry->priv = (struct photon_buffer_priv_t){0, 0};
900  entry->footer = 1;
901 
902  dbg_trace("Post send request");
903  dbg_trace("Request: 0x%016lx", entry->request);
904  dbg_trace("Addr: %p", (void *)entry->addr);
905  dbg_trace("Size: %lu", entry->size);
906  dbg_trace("Tag: %d", entry->tag);
907  dbg_trace("Keys: 0x%016lx / 0x%016lx", entry->priv.key0, entry->priv.key1);
908 
909  {
910  uintptr_t rmt_addr;
911  rmt_addr = photon_processes[proc].remote_snd_info_ledger->remote.addr;
912  rmt_addr += curr * sizeof(*entry);
913 
914  rc = be->rdma_put(proc, (uintptr_t)entry, rmt_addr, sizeof(*entry), &(shared_storage->bint.buf),
915  &(photon_processes[proc].remote_rcv_info_ledger->remote), req->rattr.cookie,
916  0, RDMA_FLAG_NIL);
917  if (rc != PHOTON_OK) {
918  dbg_err("RDMA PUT failed for 0x%016lx", req->id);
919  goto error_exit;
920  }
921  }
922 
923  return PHOTON_OK;
924 
925 error_exit:
926  if (request != NULL) {
927  *request = NULL_REQUEST;
928  }
929  return PHOTON_ERROR;
930 }
931 
932 int _photon_wait_recv_buffer_rdma(int proc, uint64_t size, int tag, photon_rid *request) {
933  photonRILedgerEntry curr_entry, entry_iterator;
934  photonRequest req;
935  uint64_t curr;
936  int rc, c_ind;
937 
938  dbg_trace("(%d, %d)", proc, tag);
939 
940  start:
941  curr = sync_load(&photon_processes[proc].local_rcv_info_ledger->curr, SYNC_RELAXED);
942  c_ind = curr & (photon_processes[proc].local_rcv_info_ledger->num_entries - 1);
943  curr_entry = &(photon_processes[proc].local_rcv_info_ledger->entries[c_ind]);
944 
945  dbg_trace("Spinning on info ledger looking for receive request");
946  dbg_trace("looking in position %d/%p", c_ind, curr_entry);
947 
948  entry_iterator = curr_entry;
949  do {
950  while (entry_iterator->header == 0 || entry_iterator->footer == 0) {
951  rc = __photon_try_one_event(&req);
952  if (rc == PHOTON_EVENT_ERROR) {
953  dbg_err("Failure getting event");
954  return PHOTON_ERROR;
955  }
956  ;
957  }
958  if( (tag < 0) || (entry_iterator->tag == tag ) ) {
959  if (sync_cas(&photon_processes[proc].local_rcv_info_ledger->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
960  break;
961  }
962  else goto start;
963  }
964  } while(1);
965 
966  if (request != NULL) {
967  photonRequest req;
968  req = photon_setup_request_ledger_info(curr_entry, c_ind, proc);
969  if (req == NULL) {
970  log_err("Could not setup request for proc %d", proc);
971  goto error_exit;
972  }
973  *request = req->id;
974  sync_fadd(&photon_processes[proc].local_rcv_info_ledger->prog, 1, SYNC_RELAXED);
975  }
976 
977  return PHOTON_OK;
978  error_exit:
979  return PHOTON_ERROR;
980 }
981 
982 int _photon_wait_send_buffer_rdma(int proc, uint64_t size, int tag, photon_rid *request) {
983  photonRDMALedgerEntry eager_entry, entries;
984  photonRILedgerEntry curr_entry, entry_iterator;
985  uint64_t curr, curr_eager;
986  int c_ind, ce_ind;
987  bool eager = false;
988 
989  dbg_trace("(%d, %d)", proc, tag);
990 
991  start:
992 
993  curr = sync_load(&photon_processes[proc].local_snd_info_ledger->curr, SYNC_RELAXED);
994  c_ind = curr & (photon_processes[proc].local_snd_info_ledger->num_entries - 1);
995  curr_entry = &(photon_processes[proc].local_snd_info_ledger->entries[c_ind]);
996 
997  curr_eager = sync_load(&photon_processes[proc].local_eager_ledger->curr, SYNC_RELAXED);
998  ce_ind = curr_eager & (photon_processes[proc].local_eager_ledger->num_entries - 1);
999  entries = photon_processes[proc].local_eager_ledger->entries;
1000  eager_entry = &(entries[ce_ind]);
1001 
1002  dbg_trace("Spinning on info/eager ledger looking for receive request");
1003  dbg_trace("looking in position %d/%p (%d/%p)", c_ind, curr_entry, ce_ind, eager_entry);
1004 
1005  entry_iterator = curr_entry;
1006 
1007  do {
1008  while((entry_iterator->header == 0 || entry_iterator->footer == 0) && (eager_entry->request == 0)) {
1009  ;
1010  }
1011  if (eager_entry->request && (size == PHOTON_ANY_SIZE)) {
1012  if (sync_cas(&photon_processes[proc].local_eager_ledger->curr, &curr_eager, curr_eager+1, SYNC_RELAXED, SYNC_RELAXED)) {
1013  eager = true;
1014  break;
1015  }
1016  else goto start;
1017  }
1018  else if (eager_entry->request && (size == eager_entry->request>>32)) {
1019  if (sync_cas(&photon_processes[proc].local_eager_ledger->curr, &curr_eager, curr_eager+1, SYNC_RELAXED, SYNC_RELAXED)) {
1020  eager = true;
1021  break;
1022  }
1023  else goto start;
1024  }
1025  else if( ((tag < 0) || (entry_iterator->tag == tag )) && (size == PHOTON_ANY_SIZE) ) {
1026  if (sync_cas(&photon_processes[proc].local_snd_info_ledger->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
1027  break;
1028  }
1029  else goto start;
1030  }
1031  else if (((tag < 0) || (entry_iterator->tag == tag )) && (size == entry_iterator->size)) {
1032  if (sync_cas(&photon_processes[proc].local_snd_info_ledger->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
1033  break;
1034  }
1035  else goto start;
1036  }
1037  } while(1);
1038 
1039  if (request != NULL) {
1040  photonRequest req;
1041  if (eager) {
1042  req = photon_setup_request_ledger_eager(eager_entry, ce_ind, proc);
1043  sync_fadd(&photon_processes[proc].local_eager_ledger->prog, 1, SYNC_RELAXED);
1044  }
1045  else {
1046  req = photon_setup_request_ledger_info(curr_entry, c_ind, proc);
1047  sync_fadd(&photon_processes[proc].local_snd_info_ledger->prog, 1, SYNC_RELAXED);
1048  }
1049  if (req == NULL) {
1050  log_err("Could not setup request for proc %d", proc);
1051  goto error_exit;
1052  }
1053  *request = req->id;
1054  }
1055 
1056  return PHOTON_OK;
1057 error_exit:
1058  return PHOTON_ERROR;
1059 }
1060 
1062  photonRILedgerEntry curr_entry, entry_iterator;
1063  int iproc, still_searching = 1;
1064 #ifdef ENABLE_DEBUG
1065  time_t stime;
1066 #endif
1067  uint64_t curr;
1068  int c_ind;
1069 
1070  dbg_trace("(%d)", tag);
1071 
1072  dbg_trace("Spinning on send info ledger looking for send request");
1073 
1074  iproc = -1;
1075 #ifdef ENABLE_DEBUG
1076  stime = time(NULL);
1077 #endif
1078  do {
1079  iproc = (iproc+1)%_photon_nproc;
1080  curr = sync_load(&photon_processes[iproc].local_snd_info_ledger->curr, SYNC_RELAXED);
1081  c_ind = curr & (photon_processes[iproc].local_snd_info_ledger->num_entries - 1);
1082  curr_entry = &(photon_processes[iproc].local_snd_info_ledger->entries[c_ind]);
1083  dbg_trace("looking in position %d/%p for proc %d", c_ind, curr_entry, iproc);
1084 
1085  entry_iterator = curr_entry;
1086  while(entry_iterator->header == 1 && entry_iterator->footer == 1)
1087  if( (entry_iterator->addr == (uintptr_t)0) && (entry_iterator->priv.key0 == 0) && ((tag < 0) || (entry_iterator->tag == tag )) ) {
1088  if (sync_cas(&photon_processes[iproc].local_snd_info_ledger->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
1089  dbg_trace("Found matching send request with tag %d from proc %d", tag, iproc);
1090  still_searching = 0;
1091  break;
1092  }
1093  }
1094 #ifdef ENABLE_DEBUG
1095  stime = _tictoc(stime, -1);
1096 #endif
1097  } while(still_searching);
1098 
1099  curr_entry->header = 0;
1100  curr_entry->footer = 0;
1101  sync_fadd(&photon_processes[iproc].local_snd_info_ledger->prog, 1, SYNC_RELAXED);
1102 
1103  return PHOTON_OK;
1104 }
1105 
1106 int _photon_post_os_put(photon_rid request, int proc, void *ptr, uint64_t size,
1107  int tag, uint64_t r_offset) {
1108  photonBackend be = photon_processes[proc].backend;
1109  photonRequest req;
1110  photonBuffer drb;
1111  photonBufferHandle db;
1112  int rc;
1113 
1114  dbg_trace("(%d, %p, %lu, %lu, %lu)", proc, ptr, size, r_offset, request);
1115 
1116  if ((req = photon_lookup_request(request)) == NULL) {
1117  log_err("Could not find request");
1118  goto error_exit;
1119  }
1120 
1121  if (request != req->id) {
1122  log_err("Request mismatch encountered!");
1123  goto error_exit;
1124  }
1125 
1126  if (proc != req->proc) {
1127  log_err("Request/proc mismatch: %d/%d", proc, req->proc);
1128  goto error_exit;
1129  }
1130 
1131  // photon_post_os_put() causes an RDMA transfer, but its own completion is
1132  // communicated to the task that posts it through a completion event
1133  req->type = EVQUEUE;
1134  req->tag = tag;
1135  req->state = REQUEST_PENDING;
1136  req->flags |= REQUEST_FLAG_WFIN;
1137 
1138  /* get the remote buffer saved in the request */
1139  drb = &(req->remote_info.buf);
1140 
1141  if (buffertable_find_containing( (void *)ptr, size, &db) != 0) {
1142  log_err("Tried posting a send for a buffer not registered");
1143  goto error_exit;
1144  }
1145 
1146  if (drb->size > 0 && size + r_offset > drb->size) {
1147  log_err("Requested to send %lu bytes to a buffer of size %lu at offset %lu", size, drb->size, r_offset);
1148  goto error_exit;
1149  }
1150 
1151  dbg_trace("Posting Request ID: %d/%lu", proc, request);
1152 
1153  {
1154  rc = be->rdma_put(proc, (uintptr_t)ptr, drb->addr + (uintptr_t)r_offset,
1155  size, &(db->bint.buf), drb, request, 0, RDMA_FLAG_NIL);
1156 
1157  if (rc != PHOTON_OK) {
1158  dbg_err("RDMA PUT failed for 0x%016lx", request);
1159  goto error_exit;
1160  }
1161  }
1162 
1163  return PHOTON_OK;
1164 
1165  error_exit:
1166  return PHOTON_ERROR;
1167 }
1168 
1169 int _photon_post_os_get(photon_rid request, int proc, void *ptr, uint64_t size,
1170  int tag, uint64_t r_offset) {
1171  photonBackend be = photon_processes[proc].backend;
1172  photonRequest req;
1173  photonBuffer drb;
1174  photonBufferHandle db;
1175  int rc;
1176 
1177  dbg_trace("(%d, %p, %lu, %lu, 0x%016lx)", proc, ptr, size, r_offset, request);
1178 
1179  if ((req = photon_lookup_request(request)) == NULL) {
1180  log_err("Could not find request");
1181  goto error_exit;
1182  }
1183 
1184  if (request != req->id) {
1185  log_err("Request mismatch encountered!");
1186  goto error_exit;
1187  }
1188 
1189  if (proc != req->proc) {
1190  log_err("Request/proc mismatch: %d/%d", proc, req->proc);
1191  goto error_exit;
1192  }
1193 
1194  /* photon_post_os_get() causes an RDMA transfer, but its own completion is
1195  communicated to the task that posts it through a completion event. */
1196  req->type = EVQUEUE;
1197  req->tag = tag;
1198  req->state = REQUEST_PENDING;
1199  req->flags |= REQUEST_FLAG_WFIN;
1200 
1201  /* get the remote buffer saved in the request */
1202  drb = &(req->remote_info.buf);
1203 
1204  if (buffertable_find_containing( (void *)ptr, size, &db) != 0) {
1205  log_err("Tried posting a os_get() into a buffer that's not registered");
1206  return -1;
1207  }
1208 
1209  if ( (drb->size > 0) && ((size+r_offset) > drb->size) ) {
1210  log_err("Requested to get %lu bytes from a %lu buffer size at offset %lu", size, drb->size, r_offset);
1211  return -2;
1212  }
1213 
1214  if (req->flags & REQUEST_FLAG_EAGER) {
1215  photonEagerBuf eb = photon_processes[proc].local_eager_buf;
1216  uint64_t offset, curr, new, left;
1217 
1218  curr = sync_load(&eb->curr, SYNC_RELAXED);
1219  offset = curr & (eb->size - 1);
1220  left = eb->size - offset;
1221  if (left < size) {
1222  new = curr + left + size;;
1223  offset = 0;
1224  }
1225  else {
1226  new = curr + size;
1227  }
1228 
1229  if (sync_cas(&eb->curr, &curr, new, SYNC_RELAXED, SYNC_RELAXED)) {
1230  dbg_trace("EAGER copy message of size %lu from addr: 0x%016lx (offset=%lu)",
1231  size, (uintptr_t)&eb->data[offset], offset);
1232  memcpy(ptr, &eb->data[offset], size);
1233  memset(&eb->data[offset], 0, size);
1234  req->flags |= REQUEST_FLAG_EDONE;
1235  sync_store(&eb->prog, new, SYNC_RELAXED);
1236  }
1237 
1238  return PHOTON_OK;
1239  }
1240 
1241  dbg_trace("Posted Request ID: %d/0x%016lx", proc, request);
1242 
1243  {
1244  rc = be->rdma_get(proc, (uintptr_t)ptr, drb->addr + (uintptr_t)r_offset,
1245  size, &(db->bint.buf), drb, request, 0);
1246 
1247  if (rc != PHOTON_OK) {
1248  dbg_err("RDMA GET failed for 0x%016lx\n", request);
1249  goto error_exit;
1250  }
1251  }
1252 
1253  return PHOTON_OK;
1254 
1255  error_exit:
1256  return PHOTON_ERROR;
1257 }
1258 
1259 int _photon_post_os_put_direct(int proc, void *ptr, uint64_t size, photonBuffer rbuf,
1260  int flags, photon_rid *request) {
1261  photonBackend be = photon_processes[proc].backend;
1262  photonBufferHandle db;
1263  photonRequest req;
1264  struct photon_buffer_t lbuf;
1265  int rc;
1266 
1267  dbg_trace("(%d, %p, %lu, %lu, %p)", proc, ptr, size, rbuf->size, request);
1268 
1269  if (buffertable_find_containing( (void *)ptr, size, &db) != 0) {
1270  log_err("Tried posting a os_put_direct() from a buffer that's not registered");
1271  return -1;
1272  }
1273 
1274  lbuf.addr = (uintptr_t)ptr;
1275  lbuf.size = size;
1276  lbuf.priv = db->bint.buf.priv;
1277 
1278  req = photon_setup_request_direct(&lbuf, rbuf, size, proc, 1);
1279  if (req == NULL) {
1280  dbg_trace("Could not setup direct buffer request for proc %d", proc);
1281  goto error_exit;
1282  }
1283 
1284  *request = req->id;
1285 
1286  {
1287  rc = be->rdma_put(proc, (uintptr_t)ptr, rbuf->addr,
1288  size, &(db->bint.buf), rbuf, req->rattr.cookie,
1289  0, RDMA_FLAG_NIL);
1290 
1291  if (rc != PHOTON_OK) {
1292  dbg_err("RDMA PUT failed for 0x%016lx", req->rattr.cookie);
1293  goto error_exit;
1294  }
1295 
1296  dbg_trace("Posted Proc/Request/Cookie: %d/0x%016lx/0x%016lx", proc, req->id, req->rattr.cookie);
1297  }
1298 
1299  return PHOTON_OK;
1300 
1301  error_exit:
1302  if (request != NULL) {
1303  *request = NULL_REQUEST;
1304  }
1305  return PHOTON_ERROR;
1306 }
1307 
1308 int _photon_post_os_get_direct(int proc, void *ptr, uint64_t size, photonBuffer rbuf, int flags, photon_rid *request) {
1309  photonBackend be = photon_processes[proc].backend;
1310  photonBufferHandle db;
1311  photonRequest req;
1312  struct photon_buffer_t lbuf;
1313  int rc;
1314 
1315  dbg_trace("(%d, %p, %lu, %lu, %p)", proc, ptr, size, rbuf->size, request);
1316 
1317  if (buffertable_find_containing( (void *)ptr, size, &db) != 0) {
1318  log_err("Tried posting a os_get_direct() from a buffer that's not registered");
1319  return -1;
1320  }
1321 
1322  lbuf.addr = (uintptr_t)ptr;
1323  lbuf.size = size;
1324  lbuf.priv = db->bint.buf.priv;
1325 
1326  req = photon_setup_request_direct(&lbuf, rbuf, size, proc, 1);
1327  if (req == NULL) {
1328  dbg_trace("Could not setup direct buffer request for proc %d", proc);
1329  goto error_exit;
1330  }
1331 
1332  *request = req->id;
1333 
1334  {
1335  rc = be->rdma_get(proc, (uintptr_t)ptr, rbuf->addr, size,
1336  &(db->bint.buf), rbuf, req->rattr.cookie,
1337  RDMA_FLAG_NIL);
1338 
1339  if (rc != PHOTON_OK) {
1340  dbg_err("RDMA GET failed for 0x%016lx", req->rattr.cookie);
1341  goto error_exit;
1342  }
1343 
1344  dbg_trace("Posted Proc/Request/Cookie: %d/0x%016lx/0x%016lx", proc, req->id, req->rattr.cookie);
1345  }
1346 
1347  return PHOTON_OK;
1348 
1349 error_exit:
1350  if (request != NULL) {
1351  *request = NULL_REQUEST;
1352  }
1353  return PHOTON_ERROR;
1354 }
1355 
1356 int _photon_send_FIN(photon_rid request, int proc, int flags) {
1357  photonBackend be = photon_processes[proc].backend;
1358  photonRequest req;
1359  photonRDMALedgerEntry entry, entries;
1360  int curr, rc;
1361 
1362  dbg_trace("(%d, 0x%016lx)", proc, request);
1363 
1364  if ((req = photon_lookup_request(request)) == NULL) {
1365  log_err("Could not find request");
1366  goto error_exit;
1367  }
1368 
1369  if (req->state != REQUEST_COMPLETED) {
1370  dbg_trace("Warning: sending FIN for a request (EVQUEUE) that is not in completed state (state==%d)", req->state);
1371  }
1372 
1373  if (req->remote_info.id.u64 == NULL_REQUEST) {
1374  log_err("Trying to FIN a remote buffer request that was never set!");
1375  goto error_exit;
1376  }
1377 
1378  curr = photon_rdma_ledger_get_next(proc, photon_processes[proc].remote_fin_ledger);
1379  if (curr < 0) {
1380  if (curr == -2) {
1381  dbg_trace("Exceeding known receiver FIN progress!");
1382  return PHOTON_ERROR_RESOURCE;
1383  }
1384  goto error_exit;
1385  }
1386 
1387  entries = photon_processes[proc].remote_fin_ledger->entries;
1388  entry = &(entries[curr]);
1389  dbg_trace("photon_processes[%d].remote_fin_ledger->curr==%d", proc, curr);
1390 
1391  if( entry == NULL ) {
1392  log_err("entry is NULL for proc=%d", proc);
1393  goto error_exit;
1394  }
1395 
1396  entry->request = req->remote_info.id.u64;
1397 
1398  {
1399  uintptr_t rmt_addr;
1400  rmt_addr = photon_processes[proc].remote_fin_ledger->remote.addr;
1401  rmt_addr += curr * sizeof(*entry);
1402 
1403  rc = be->rdma_put(proc, (uintptr_t)entry, rmt_addr, sizeof(*entry), &(shared_storage->bint.buf),
1404  &(photon_processes[proc].remote_fin_ledger->remote), NULL_REQUEST,
1405  0, RDMA_FLAG_NIL);
1406  if (rc != PHOTON_OK) {
1407  dbg_err("RDMA PUT failed for 0x%016lx", NULL_REQUEST);
1408  goto error_exit;
1409  }
1410  }
1411 
1412  if (req->state == REQUEST_COMPLETED || flags & PHOTON_REQ_COMPLETED) {
1413  dbg_trace("Removing request 0x%016lx for remote buffer request 0x%016lx", request, req->remote_info.id);
1414  req->state = REQUEST_COMPLETED;
1415  photon_free_request(req);
1416  dbg_trace("%d requests left in reqtable for proc %d", photon_count_request(req->proc), req->proc);
1417  }
1418  else {
1419  req->flags &= ~REQUEST_FLAG_WFIN;
1420  }
1421 
1422  MARK_DONE(photon_processes[proc].remote_fin_ledger, 1);
1423 
1424  return PHOTON_OK;
1425 
1426 error_exit:
1427  return PHOTON_ERROR;
1428 }
1429 
1430 // Polls EVQ waiting for an event associated with a request.
1431 // Returns the request associated with the event, but only removes
1432 // request if it is an EVQUEUE event, not LEDGER.
1433 int _photon_wait_any(int *ret_proc, photon_rid *ret_req) {
1434  int rc;
1435  photonRequest req;
1436 
1437  if (ret_req == NULL) {
1438  goto error_exit;
1439  }
1440 
1441  do {
1442  rc = __photon_try_one_event(&req);
1443  } while (rc != PHOTON_EVENT_REQCOMP);
1444 
1445  *ret_proc = req->proc;
1446  *ret_req = req->id;
1447 
1448  photon_free_request(req);
1449 
1450  return PHOTON_OK;
1451  error_exit:
1452  return PHOTON_ERROR;
1453 }
1454 
1455 int _photon_wait_any_ledger(int *ret_proc, photon_rid *ret_req) {
1456  static int i = -1; // this is static so we don't starve events in later processes
1457  uint64_t curr;
1458  int c_ind;
1459 
1460  if (ret_req == NULL || ret_proc == NULL) {
1461  goto error_exit;
1462  }
1463 
1465  log_warn("No outstanding requests to wait for");
1466  }
1467 
1468  while(1) {
1469  photonRDMALedgerEntry curr_entry, entries;
1470  photonLedger l;
1471 
1472  i= (i+1) % _photon_nproc;
1473  l = photon_processes[i].local_fin_ledger;
1474  entries = l->entries;
1475  // check if an event occurred on the RDMA end of things
1476  curr = sync_load(&l->curr, SYNC_RELAXED);
1477  c_ind = curr & (l->num_entries - 1);
1478  curr_entry = &(entries[c_ind]);
1479  dbg_trace("Wait All Out: %d", c_ind);
1480 
1481  if ((curr_entry->request != (uint64_t) 0) &&
1482  sync_cas(&l->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED)) {
1483  photonRequest req;
1484  dbg_trace("Wait All In: %d/0x%016lx", c_ind, curr_entry->request);
1485 
1486  req = photon_lookup_request(curr_entry->request);
1487  if (req != NULL) {
1488  *ret_req = curr_entry->request;
1489  *ret_proc = i;
1490  photon_free_request(req);
1491  break;
1492  }
1493  curr_entry->request = NULL_REQUEST;
1494  sync_fadd(&l->prog, 1, SYNC_RELAXED);
1495  }
1496  }
1497 
1498  return PHOTON_OK;
1499 
1500 error_exit:
1501  return PHOTON_ERROR;
1502 }
1503 
1504 int _photon_probe_ledger(int proc, int *flag, int type, photonStatus status) {
1505  photonRILedger ledger;
1506  photonRDMALedgerEntry eager_entry, entries;
1507  photonRILedgerEntry entry_iterator;
1508  uint64_t curr;
1509  int i;
1510  int start, end, c_ind;
1511 
1512  //dbg_trace("(%d, %d)", proc, type);
1513 
1514  *flag = 0;
1515 
1516  if (proc == PHOTON_ANY_SOURCE) {
1517  start = 0;
1518  end = _photon_nproc;
1519  }
1520  else {
1521  start = proc;
1522  end = proc+1;
1523  }
1524 
1525  for (i=start; i<end; i++) {
1526 
1527  switch (type) {
1528  case PHOTON_PROBE_SLEDGER:
1529  ledger = photon_processes[i].local_snd_info_ledger;
1530  break;
1531  case PHOTON_PROBE_RLEDGER:
1532  ledger = photon_processes[i].local_rcv_info_ledger;
1533  break;
1534  default:
1535  dbg_err("unknown ledger type");
1536  goto error_exit;
1537  }
1538 
1539  //for (j=0; j<ledger->num_entries; j++) {
1540  {
1541  // process any eager entry first
1542  if (type == PHOTON_PROBE_SLEDGER) {
1543  curr = sync_load(&photon_processes[i].local_eager_ledger->curr, SYNC_RELAXED);
1544  c_ind = curr & (photon_processes[i].local_eager_ledger->num_entries - 1);
1545  entries = photon_processes[i].local_eager_ledger->entries;
1546  eager_entry = &(entries[c_ind]);
1547  if (eager_entry->request) {
1548  status->src_addr.global.proc_id = i;
1549  status->request = eager_entry->request;
1550  status->size = eager_entry->request>>32;
1551 
1552  *flag = 1;
1553 
1554  return PHOTON_OK;
1555  }
1556  }
1557 
1558  curr = sync_load(&ledger->curr, SYNC_RELAXED);
1559  c_ind = curr & (ledger->num_entries - 1);
1560  entry_iterator = &(ledger->entries[c_ind]);
1561  if (entry_iterator->header && entry_iterator->footer && (entry_iterator->tag > 0)) {
1562  status->src_addr.global.proc_id = i;
1563  status->request = entry_iterator->request;
1564  status->tag = entry_iterator->tag;
1565  status->size = entry_iterator->size;
1566 
1567  dbg_trace("Request: 0x%016lx", entry_iterator->request);
1568  dbg_trace("Address: %p", (void *)entry_iterator->addr);
1569  dbg_trace("Size: %lu", entry_iterator->size);
1570  dbg_trace("Tag: %d", entry_iterator->tag);
1571 
1572  *flag = 1;
1573 
1574  return PHOTON_OK;
1575  }
1576  }
1577  }
1578 
1579 error_exit:
1580  return PHOTON_ERROR;
1581 }
1582 
1583 /* similar to photon_test()
1584  0 if some request ready to pop
1585  1 if no request found
1586  -1 on error */
1587 int _photon_probe(photonAddr addr, int *flag, photonStatus status) {
1588  //char buf[40];
1589  //inet_ntop(AF_INET6, addr->raw, buf, 40);
1590  //dbg_trace("(%s)", buf);
1591 
1592  photonRequest req = NULL;
1593  int rc = 0;
1594 
1595  if (req) {
1596  *flag = 1;
1597  status->src_addr.global.proc_id = req->proc;
1598  status->request = req->id;
1599  status->tag = req->tag;
1600  status->size = req->size;
1601  status->count = 1;
1602  status->error = 0;
1603  dbg_trace("returning 0, flag:1");
1604  return PHOTON_OK;
1605  }
1606  else {
1607  *flag = 0;
1608  //dbg_trace("returning %d, flag:0", rc);
1609  return rc;
1610  }
1611 }
1612 
1613 /* begin I/O */
1614 int _photon_io_init(char *file, int amode, void *view, int niter) {
1615  /* forwarders do our I/O for now */
1616  if (__photon_forwarder != NULL) {
1617  return __photon_forwarder->io_init(photon_processes, file, amode, view, niter);
1618  }
1619  else {
1620  return PHOTON_ERROR;
1621  }
1622 }
1623 
1625  /* forwarders do our I/O for now */
1626  if (__photon_forwarder != NULL) {
1627  return __photon_forwarder->io_finalize(photon_processes);
1628  }
1629  else {
1630  return PHOTON_ERROR;
1631  }
1632 }
1633 /* end I/O */
1634 
1635 /* begin util */
1637  if (!raddr) {
1638  return PHOTON_ERROR;
1639  }
1640 
1641  // see if we have a block_id to send to
1642  if (!(addr->blkaddr.blk0) &&
1643  !(addr->blkaddr.blk1) &&
1644  !(addr->blkaddr.blk2)) {
1645  if (__photon_config->ibv.ud_gid_prefix) {
1646  inet_pton(AF_INET6, __photon_config->ibv.ud_gid_prefix, raddr->raw);
1647  uint32_t *iptr = (uint32_t*)&(raddr->raw[12]);
1648  *iptr = htonl(addr->blkaddr.blk3);
1649  }
1650  else {
1651  dbg_err("block_id, missing ud_gid_prefix?");
1652  return PHOTON_ERROR;
1653  }
1654  }
1655  else {
1656  raddr->global.prefix = addr->global.prefix;
1657  raddr->global.proc_id = addr->global.proc_id;
1658  }
1659 
1660  return PHOTON_OK;
1661 }
1662 
1664  const struct photon_buffer_priv_t **pptr) {
1665  photonBufferHandle db;
1666 
1667  if (buffertable_find_containing(addr, size, &db) != 0) {
1668  dbg_warn("Could not find buffer: 0x%016lx of size %lu", (uintptr_t)addr, size);
1669  goto error_exit;
1670  }
1671 
1672  if (!db->is_registered) {
1673  dbg_err("Could not lookup private buffer info on unregistered buffer");
1674  goto error_exit;
1675  }
1676 
1677  *pptr = (const struct photon_buffer_priv_t *)&(db->bint.buf.priv);
1678 
1679  return PHOTON_OK;
1680 
1681  error_exit:
1682  *pptr = NULL;
1683  return PHOTON_ERROR;
1684 }
1685 
1686 int _photon_get_buffer_remote(photon_rid request, photonBuffer ret_buf) {
1687  photonRequest req;
1688 
1689  if ((req = photon_lookup_request(request)) == NULL) {
1690  log_err("Could not find request 0x%016lx", request);
1691  goto error_exit;
1692  }
1693 
1694  if ((req->state != REQUEST_NEW) && (req->state != REQUEST_PENDING)) {
1695  dbg_trace("Request has already trasitioned, can not return remote buffer info");
1696  goto error_exit;
1697  }
1698 
1699  if (ret_buf) {
1700  (*ret_buf).addr = req->remote_info.buf.addr;
1701  (*ret_buf).size = req->remote_info.buf.size;
1702  (*ret_buf).priv = req->remote_info.buf.priv;
1703  }
1704 
1705  return PHOTON_OK;
1706 
1707  error_exit:
1708  ret_buf = NULL;
1709  return PHOTON_ERROR;
1710 }
1711 /* end util */
1712 
1713 #ifdef HAVE_XSP
1714 int photon_xsp_lookup_proc(libxspSess *sess, ProcessInfo **ret_pi, int *index) {
1715  int i;
1716 
1717  for (i = 0; i < _photon_nproc; i++) {
1718  if (photon_processes[i].sess &&
1719  !xsp_sesscmp(photon_processes[i].sess, sess)) {
1720  if (index)
1721  *index = i;
1722  *ret_pi = &photon_processes[i];
1723  return PHOTON_OK;
1724  }
1725  }
1726 
1727  if (index)
1728  *index = -1;
1729  *ret_pi = NULL;
1730  return PHOTON_ERROR;
1731 }
1732 
1733 int photon_xsp_unused_proc(ProcessInfo **ret_pi, int *index) {
1734  int i;
1735 
1736  /* find a process struct that has no session... */
1737  for (i = 0; i < _photon_nproc; i++) {
1738  if (!photon_processes[i].sess)
1739  break;
1740  }
1741 
1742  if (i == _photon_nproc) {
1743  if (index)
1744  *index = -1;
1745  *ret_pi = NULL;
1746  return PHOTON_ERROR;
1747  }
1748 
1749  if (index)
1750  *index = i;
1751 
1752  *ret_pi = &photon_processes[i];
1753  return PHOTON_OK;
1754 }
1755 #endif
int _photon_try_eager(int proc, void *ptr, uint64_t size, int tag, photon_rid *request, photonBufferHandle db)
#define REQUEST_FLAG_LDONE
int _photon_post_send_request_rdma(int proc, uint64_t size, int tag, photon_rid *request)
int __photon_nbpop_sr(photonRequest req)
Definition: photon_event.c:272
#define REQUEST_COOK_RECV
Convenience pointer type for the buffer structure.
Definition: photon.h:105
int buffertable_remove(photonBufferHandle buf)
#define RDMA_FLAG_NIL
photonRequest photon_lookup_request(photon_rid rid)
#define PHOTON_NP_PBUF_SIZE
#define PHOTON_EVENT_REQCOMP
Definition: photon_event.h:26
photonBufferHandle photon_buffer_create(void *addr, uint64_t size, int flags)
#define LEDGER
int _photon_send(photonAddr addr, void *ptr, uint64_t size, int flags, photon_rid *request)
int _photon_wait_recv_buffer_rdma(int proc, uint64_t size, int tag, photon_rid *request)
#define PHOTON_NP_LEDG_SIZE
photonRequest photon_setup_request_ledger_eager(photonRDMALedgerEntry entry, int curr, int proc)
struct photon_ri_ledger_entry_t * photonRILedgerEntry
PHOTON_INTERNAL int _photon_initialized(void)
photonBufferHandle shared_storage
#define PHOTON_LRI_PTR(a)
#define REQUEST_FLAG_EAGER
int photon_count_request(int proc)
int _photon_smsize
Definition: libphoton.c:60
int _LEDGER_SIZE
Definition: libphoton.c:53
#define REQUEST_OP_RECVBUF
struct photon_buffer_handle_t * photonBufferHandle
#define EVQUEUE
photonRequest photon_setup_request_direct(photonBuffer lbuf, photonBuffer rbuf, uint64_t size, int proc, int events)
photonLedger remote_eager_ledger
int _photon_nproc
Definition: libphoton.c:55
#define PHOTON_PROBE_RLEDGER
RNDV probe flag: return RECV ledger completion IDs.
Definition: photon.h:51
int photon_wait_send_request_rdma(int tag)
Definition: libphoton.c:438
struct photon_rdma_ledger_entry_t * photonRDMALedgerEntry
photonBackend __photon_shmem
Definition: libphoton.c:49
#define PHOTON_ANY_SIZE
RNDV flag: return completions of any size.
Definition: photon.h:55
uint64_t photon_rid
The Photon request ID.
Definition: photon.h:75
photonBackend __photon_fabric
Definition: libphoton.c:50
photonRILedger remote_rcv_info_ledger
#define PHOTON_NP_EBUF_SIZE
#define PHOTON_NP_PEDG_SIZE
#define PHOTON_ANY_SOURCE
RNDV and PWC flag: return completions from any source.
Definition: photon.h:54
int _photon_pbsize
Definition: libphoton.c:59
#define PHOTON_LPB_PTR(a)
int _photon_nforw
Definition: libphoton.c:56
#define LEDGER_ALL
struct photon_ri_ledger_t * photonRILedger
#define REQUEST_FLAG_WFIN
int buffertable_find_exact(void *addr, uint64_t size, photonBufferHandle *result)
#define PHOTON_ERROR_NOINIT
Error code, subsystem not initialized.
Definition: photon.h:31
#define PHOTON_LEB_PTR(a)
#define PHOTON_NP_INFO_SIZE
#define REQUEST_COOK_EAGER
int _photon_post_os_get_direct(int proc, void *ptr, uint64_t size, photonBuffer rbuf, int flags, photon_rid *request)
int _photon_finalize()
int photon_msgbuffer_free(photonMsgBuf mbuf)
int _photon_upsize
Definition: libphoton.c:61
union photon_addr_t * photonAddr
Convenience pointer type for the address union.
Definition: photon.h:112
#define PHOTON_NP_RPEDG_SIZE
int _photon_post_recv_buffer_rdma(int proc, void *ptr, uint64_t size, int tag, photon_rid *request)
struct photon_buffer_priv_t priv
THe associated private buffer information.
Definition: photon.h:108
#define MARK_DONE(e, s)
int buffertable_insert(photonBufferHandle buf)
int _photon_myrank
Definition: libphoton.c:54
int __photon_nbpop_ledger(photonRequest req)
Definition: photon_event.c:285
#define PHOTON_LE_PTR(a)
photonRequest photon_get_request(int proc)
int __photon_nbpop_event(photonRequest req)
Definition: photon_event.c:233
int photon_setup_eager_buf(ProcessInfo *photon_processes, char *buf, int size)
photonForwarder __photon_forwarder
Definition: libphoton.c:51
photonBackend backend
int _photon_init(photonConfig cfg, ProcessInfo *info, photonBufferHandle ss)
int _photon_wait_send_request_rdma(int tag)
void photon_buffer_free(photonBufferHandle buf)
int photon_rdma_eager_buf_get_offset(int proc, photonEagerBuf buf, int size, int lim)
int _photon_recv(photon_rid request, void *ptr, uint64_t size, int flags)
photonLedger remote_fin_ledger
#define PHOTON_LP_PTR(a)
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define PHOTON_REQ_SHUTDOWN
CANCEL flag: clear all outstanding requests.
Definition: photon.h:45
int _photon_post_os_put(photon_rid request, int proc, void *ptr, uint64_t size, int tag, uint64_t r_offset)
int _photon_wait_any(int *ret_proc, photon_rid *ret_req)
int _photon_wait(photon_rid request)
#define NULL_REQUEST
#define PHOTON_PROBE_SLEDGER
RNDV probe flag: return SEND ledger completion IDs.
Definition: photon.h:50
int photon_buffer_register(photonBufferHandle buf, int flags)
int _photon_io_finalize()
int photon_setup_pwc_buf(ProcessInfo *photon_processes, char *buf, int size)
int photon_setup_ri_ledger(ProcessInfo *photon_processes, char *buf, int num_entries)
#define PHOTON_EVENT_NONE
Definition: photon_event.h:25
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
int photon_free_request(photonRequest req)
photonLedger local_eager_ledger
uint64_t size
The size of the buffer in bytes.
Definition: photon.h:107
int photon_ri_ledger_get_next(int proc, photonRILedger l)
int _photon_send_FIN(photon_rid request, int proc, int flags)
int _photon_handle_addr(photonAddr addr, photonAddr raddr)
photonRILedger local_snd_info_ledger
Use Verbs photon backend.
Definition: photon_config.h:42
#define REQUEST_FREE
int _photon_get_buffer_remote(photon_rid request, photonBuffer ret_buf)
int _photon_post_os_get(photon_rid request, int proc, void *ptr, uint64_t size, int tag, uint64_t r_offset)
int photon_buffer_unregister(photonBufferHandle buf)
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
#define REQUEST_NEW
int photon_rdma_ledger_get_next(int proc, photonLedger l)
int _photon_get_buffer_private(void *addr, uint64_t size, const struct photon_buffer_priv_t **pptr)
#define REQUEST_FLAG_EDONE
#define PHOTON_REQ_COMPLETED
RNDV flag: explitily set a request completed for FIN.
Definition: photon.h:42
#define REQUEST_OP_SENDBUF
int _photon_ebsize
Definition: libphoton.c:58
int buffertable_find_containing(void *addr, uint64_t size, photonBufferHandle *result)
photonRILedger local_rcv_info_ledger
photonMsgBuf photon_msgbuffer_new(uint64_t size, uint64_t p_size, int p_offset, int p_hsize)
photonRequest photon_setup_request_ledger_info(photonRILedgerEntry ri_entry, int curr, int proc)
int _photon_probe(photonAddr addr, int *flag, photonStatus status)
int _photon_try_rndv(int proc, void *ptr, uint64_t size, int tag, photon_rid *request, photonBufferHandle db)
#define PHOTON_LF_PTR(a)
int photon_request_init(photonConfig cfg)
#define PHOTON_ERROR_RESOURCE
Error code, resource not available.
Definition: photon.h:33
int photon_coll_init(photonConfig cfg)
Definition: photon_coll.c:41
photonEagerBuf local_eager_buf
int _photon_probe_ledger(int proc, int *flag, int type, photonStatus status)
int _photon_idsize
Definition: libphoton.c:63
photonEagerBuf remote_eager_buf
struct photon_rdma_ledger_t * photonLedger
Convenience pointer type for the private buffer structure.
Definition: photon.h:98
int __photon_wait_event(photonRequest req)
Definition: photon_event.c:418
int _photon_cancel(photon_rid request, int flags)
photonRILedger remote_snd_info_ledger
int _photon_register_buffer(void *addr, uint64_t size)
#define SENDRECV
#define BUFFER_FLAG_NIL
int buffertable_init(int size)
int _photon_io_init(char *file, int amode, void *view, int niter)
int photon_setup_pwc_ledger(ProcessInfo *photon_processes, char *buf, int num_entries)
#define PHOTON_EVENT_ERROR
Definition: photon_event.h:24
SLIST_HEAD(pendingmemregs, photon_mem_register_req)
photonConfig __photon_config
Definition: libphoton.c:48
int photon_pwc_init(photonConfig cfg)
Definition: photon_pwc.c:716
void buffertable_finalize()
int photon_setup_eager_ledger(ProcessInfo *photon_processes, char *buf, int num_entries)
int _photon_unregister_buffer(void *addr, uint64_t size)
int _photon_post_send_buffer_rdma(int proc, void *ptr, uint64_t size, int tag, photon_rid *request)
int __photon_wait_ledger(photonRequest req)
Definition: photon_event.c:337
int photon_setup_fin_ledger(ProcessInfo *photon_processes, char *buf, int num_entries)
int __photon_try_one_event(photonRequest *rreq)
Definition: photon_event.c:387
int _photon_wait_any_ledger(int *ret_proc, photon_rid *ret_req)
int photon_exchange_ledgers(ProcessInfo *processes, int flags)
#define PHOTON_EVENT_REQFOUND
Definition: photon_event.h:27
#define PHOTON_EVENT_OK
Definition: photon_event.h:23
#define REQUEST_COMPLETED
uintptr_t addr
The base address of the buffer.
Definition: photon.h:106
int _photon_test(photon_rid request, int *flag, int *type, photonStatus status)
int _photon_wait_send_buffer_rdma(int proc, uint64_t size, int tag, photon_rid *request)
int _photon_post_os_put_direct(int proc, void *ptr, uint64_t size, photonBuffer rbuf, int flags, photon_rid *request)
#define REQUEST_PENDING
#define REQUEST_OP_SENDREQ
photonLedger local_fin_ledger
struct photon_rdma_eager_buf_t * photonEagerBuf
#define REQUEST_FLAG_NIL