photon  1.1
photon_rdma_EAGER_buf.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <stddef.h>
17 
18 #include "photon_rdma_EAGER_buf.h"
19 #include "photon_exchange.h"
20 #include "photon_event.h"
21 #include "logging.h"
22 
23 static int _get_remote_progress(int proc, photonEagerBuf buf);
24 
25 photonEagerBuf photon_rdma_eager_buf_create_reuse(uint8_t *eager_buffer, int size, int prefix) {
26  photonEagerBuf new;
27 
28  new = (struct photon_rdma_eager_buf_t *)(eager_buffer + PHOTON_EBUF_SSIZE(size) -
29  sizeof(struct photon_rdma_eager_buf_t));
30 
31  memset(new, 0, sizeof(struct photon_rdma_eager_buf_t));
32 
33  new->data = eager_buffer;
34  memset(new->data, 0, sizeof(uint8_t) * size);
35 
36  new->size = size;
37  new->prog = 0;
38  new->curr = 0;
39  new->tail = 0;
40  new->acct.rcur = 0;
41  new->acct.rloc = 0;
42  new->acct.event_prefix = prefix;
43 
44  return new;
45 }
46 
48  //free(buf);
49 }
50 
51 int photon_rdma_eager_buf_get_offset(int proc, photonEagerBuf buf, int size, int lim) {
52  uint64_t curr, new, left, tail, rcur, wrap;
53  int offset;
54  photonBackend be = photon_processes[proc].backend;
55 
56  int rc = be->tx_size_left(proc);
57  if (rc < 2) {
58  return -1;
59  }
60 
61  do {
62  rcur = sync_load(&buf->acct.rcur, SYNC_RELAXED);
63  curr = sync_load(&buf->curr, SYNC_RELAXED);
64  tail = sync_load(&buf->tail, SYNC_RELAXED);
65 
66  if ((curr - tail) >= buf->size) {
67  //log_err("Exceeded number of outstanding eager buf entries - increase size or wait for completion");
68  return -1;
69  }
70 
71  offset = curr & (buf->size - 1);
72  left = buf->size - offset;
73  if (left < lim) {
74  new = curr + left + size;
75  offset = 0;
76  // if we wrap, then make sure remote has progressed by same amount
77  wrap = left + size;
78  }
79  else {
80  new = curr + size;
81  wrap = 0;
82  }
83 
84  if (((curr - rcur) + size + wrap) >= buf->size) {
85  // receiver not ready, request an updated rcur
86  _get_remote_progress(proc, buf);
87  dbg_trace("No new offset until receiver catches up...");
88  return -2;
89  }
90  } while (!sync_cas(&buf->curr, &curr, new, SYNC_RELAXED, SYNC_RELAXED));
91 
92  if (left < lim)
93  sync_fadd(&buf->tail, left, SYNC_RELAXED);
94 
95  if (((curr - rcur) + size + wrap) >= (int)(buf->size * 0.8)) {
96  // pro-actively request remote progress
97  _get_remote_progress(proc, buf);
98  }
99 
100  return offset;
101 }
102 
103 static int _get_remote_progress(int proc, photonEagerBuf buf) {
104  int rc;
105  uint32_t rloc;
106  uint64_t cookie;
107  uintptr_t rmt_addr;
108  photonRequest req;
109  photonBackend be = photon_processes[proc].backend;
110 
111  rc = __photon_try_one_event(&req);
112  if (rc == PHOTON_EVENT_ERROR) {
113  dbg_err("Failure getting event");
114  return PHOTON_ERROR;
115  }
116 
117  rloc = 0;
118  if (!rloc && sync_cas(&buf->acct.rloc, &rloc, 1, SYNC_ACQUIRE, SYNC_RELAXED)) {
119 
120  dbg_trace("Fetching remote eager (%d) curr at rcur: %llu", proc, buf->acct.rcur);
121 
122  rmt_addr = buf->remote.addr + PHOTON_EBUF_SSIZE(buf->size) -
123  sizeof(struct photon_rdma_eager_buf_t) + offsetof(struct photon_rdma_eager_buf_t, prog);
124 
125  cookie = ( (uint64_t)buf->acct.event_prefix<<32) | proc;
126 
127  rc = be->rdma_get(proc, (uintptr_t)&buf->acct.rcur, rmt_addr, sizeof(buf->acct.rcur),
128  &(shared_storage->bint.buf), &buf->remote, cookie, 0);
129  if (rc != PHOTON_OK) {
130  dbg_err("RDMA GET for remote ledger progress counter failed");
131  return PHOTON_ERROR;
132  }
133  }
134 
135  return PHOTON_OK;
136 }
photonBufferHandle shared_storage
void photon_rdma_eager_buf_free(photonEagerBuf buf)
photonBackend backend
int photon_rdma_eager_buf_get_offset(int proc, photonEagerBuf buf, int size, int lim)
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
uint64_t size
The size of the buffer in bytes.
Definition: photon.h:107
photonEagerBuf photon_rdma_eager_buf_create_reuse(uint8_t *eager_buffer, int size, int prefix)
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
#define PHOTON_EBUF_SSIZE(c)
#define PHOTON_EVENT_ERROR
Definition: photon_event.h:24
int __photon_try_one_event(photonRequest *rreq)
Definition: photon_event.c:387
struct photon_rdma_eager_buf_t * photonEagerBuf