photon  1.1
photon_rdma_ledger.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <stddef.h>
17 #include <time.h>
18 
19 #include "logging.h"
20 #include "photon_rdma_ledger.h"
21 #include "photon_buffer.h"
22 #include "photon_exchange.h"
23 #include "photon_event.h"
24 
25 static int _get_remote_progress(int proc, photonLedger buf);
26 
27 photonLedger photon_rdma_ledger_create_reuse(void *ledger_buffer, int num_entries,
28  int entry_size, int prefix) {
29  photonLedger new;
30 
31  new = (struct photon_rdma_ledger_t *)((uintptr_t)ledger_buffer +
32  PHOTON_LEDG_SSIZE(num_entries, entry_size) -
33  sizeof(struct photon_rdma_ledger_t));
34 
35  memset(new, 0, sizeof(struct photon_rdma_ledger_t));
36 
37  new->entries = ledger_buffer;
38 
39  // we bzero this out so that valgrind doesn't believe these are
40  // "uninitialized". They get filled in via RDMA so valgrind doesn't
41  // know that the values have been filled in
42  memset(new->entries, 0, entry_size * num_entries);
43 
44  new->prog = 0;
45  new->curr = 0;
46  new->tail = 0;
47  new->entry_size = entry_size;
48  new->num_entries = num_entries;
49  new->acct.rcur = 0;
50  new->acct.rloc = 0;
51  new->acct.event_prefix = prefix;
52 
53  return new;
54 }
55 
57  //free(ledger);
58 }
59 
61  uint64_t curr, tail, rcur;
62  photonBackend be = photon_processes[proc].backend;
63 
64  int rc = be->tx_size_left(proc);
65  if (rc < 2) {
66  return -1;
67  }
68 
69  do {
70  rcur = sync_load(&l->acct.rcur, SYNC_RELAXED);
71  curr = sync_load(&l->curr, SYNC_RELAXED);
72  tail = sync_load(&l->tail, SYNC_RELAXED);
73  if ((curr - tail) >= l->num_entries) {
74  //log_err("Exceeded number of outstanding ledger entries - increase ledger size or wait for completion");
75  return -1;
76  }
77  if ((curr - rcur) >= (l->num_entries)) {
78  // receiver not ready, request an updated rcur
79  _get_remote_progress(proc, l);
80  dbg_trace("No new ledger entry until receiver catches up...");
81  return -2;
82  }
83  } while (!sync_cas(&l->curr, &curr, curr+1, SYNC_RELAXED, SYNC_RELAXED));
84 
85  if ((curr - rcur) == (int)(l->num_entries * 0.8)) {
86  // do a pro-active fetch of the remote ledger progress
87  _get_remote_progress(proc, l);
88  }
89 
90  return curr & (l->num_entries - 1);
91 }
92 
93 static int _get_remote_progress(int proc, photonLedger buf) {
94  int rc;
95  uint32_t rloc;
96  uint64_t cookie;
97  uintptr_t rmt_addr;
98  photonRequest req;
99  photonBackend be = photon_processes[proc].backend;
100 
101  rc = __photon_try_one_event(&req);
102  if (rc == PHOTON_EVENT_ERROR) {
103  dbg_err("Failure getting event");
104  return PHOTON_ERROR;
105  }
106 
107  rloc = 0;
108  if (sync_cas(&buf->acct.rloc, &rloc, 1, SYNC_ACQUIRE, SYNC_RELAXED)) {
109 
110  dbg_trace("Fetching remote ledger (%d) curr at rcur: %llu", proc, buf->acct.rcur);
111 
112  rmt_addr = buf->remote.addr + PHOTON_LEDG_SSIZE(buf->num_entries, PHOTON_CID_RECV_ENTRY_SIZE) -
113  sizeof(struct photon_rdma_ledger_t) + offsetof(struct photon_rdma_ledger_t, prog);
114 
115  cookie = ( (uint64_t)buf->acct.event_prefix<<32) | proc;
116 
117  rc = be->rdma_get(proc, (uintptr_t)&buf->acct.rcur, rmt_addr, sizeof(buf->acct.rcur),
118  &(shared_storage->bint.buf), &buf->remote, cookie, 0);
119  if (rc != PHOTON_OK) {
120  dbg_err("RDMA GET for remote ledger progress counter failed");
121  return PHOTON_ERROR;
122  }
123  }
124 
125  return PHOTON_OK;
126 }
photonLedger photon_rdma_ledger_create_reuse(void *ledger_buffer, int num_entries, int entry_size, int prefix)
photonBufferHandle shared_storage
#define PHOTON_LEDG_SSIZE(c, s)
photonBackend backend
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
int photon_rdma_ledger_get_next(int proc, photonLedger l)
struct photon_rdma_ledger_t * photonLedger
#define PHOTON_EVENT_ERROR
Definition: photon_event.h:24
#define PHOTON_CID_RECV_ENTRY_SIZE
int __photon_try_one_event(photonRequest *rreq)
Definition: photon_event.c:387
void photon_rdma_ledger_free(photonLedger ledger)