photon  1.1
photon_request.c
Go to the documentation of this file.
1 // =============================================================================
2 // Photon RDMA Library (libphoton)
3 //
4 // Copyright (c) 2016, Trustees of Indiana University,
5 // All rights reserved.
6 //
7 // This software may be modified and distributed under the terms of the BSD
8 // license. See the COPYING file for details.
9 //
10 // This software was created at the Indiana University Center for Research in
11 // Extreme Scale Technologies (CREST).
12 // =============================================================================
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <assert.h>
17 
18 #include "photon_backend.h"
19 #include "photon_request.h"
20 #include "photon_pwc.h"
21 #include "util.h"
22 
23 static int __photon_cleanup_request(photonRequest req);
24 static int __photon_request_grow_table(photonRequestTable rt);
25 
26 int photon_request_init(photonConfig cfg) {
27  int i;
28  // Setup request tables
29  for (i = 0; i < (_photon_nproc + _photon_nforw); i++) {
30  photon_processes[i].request_table = malloc(sizeof(struct photon_req_table_t));
31  if (!photon_processes[i].request_table) {
32  log_err("Could not allocate request table for proc %d", i);
33  goto error_exit;
34  }
36  rt->count = 0;
37  rt->level = 0;
38  rt->next = 0;
39  rt->size = cfg->cap.default_rd;
40  rt->free = (uint32_t*)malloc(DEF_NR_LEVELS * sizeof(uint32_t));
41  rt->free[rt->level] = cfg->cap.default_rd;
42  rt->reqs = (photonRequest*)malloc(DEF_NR_LEVELS * sizeof(photonRequest));
43  if (!rt->reqs) {
44  log_err("Could not allocate request array for proc %d", i);
45  goto error_exit;
46  }
47  rt->reqs[rt->level] = (photonRequest)calloc(cfg->cap.default_rd, sizeof(struct photon_req_t));
48  if (!rt->reqs[rt->level]) {
49  log_err("Could not allocate request descriptors for proc %d", i);
50  goto error_exit;
51  }
52  rt->pwc_q = sync_two_lock_queue_new();
53  if (!rt->pwc_q) {
54  log_err("Could not allocate pwc request queue for proc %d", i);
55  goto error_exit;
56  }
57  rt->gwc_q = sync_two_lock_queue_new();
58  if (!rt->gwc_q) {
59  log_err("Could not allocate gwc request queue for proc %d", i);
60  goto error_exit;
61  }
62  rt->gwc_pwc_q = sync_two_lock_queue_new();
63  if (!rt->gwc_pwc_q) {
64  log_err("Could not allocate gwc-pwc request queue for proc %d", i);
65  goto error_exit;
66  }
67  rt->comp_q = sync_two_lock_queue_new();
68  if (!rt->comp_q) {
69  log_err("Could not allocate pwc completion queue for proc %d", i);
70  goto error_exit;
71  }
72  rt->pcount = 0;
73  rt->gcount = 0;
74  rt->gpcount = 0;
75  sync_tatas_init(&rt->tloc);
76  sync_tatas_init(&rt->pq_loc);
77  sync_tatas_init(&rt->gq_loc);
78  sync_tatas_init(&rt->pack_loc);
79  }
80 
81  return PHOTON_OK;
82 
83  error_exit:
84  return PHOTON_ERROR;
85 }
86 
87 
90  photonRequest req, reqs;
91  uint32_t rid;
92 
93  assert(IS_VALID_PROC(proc));
95 
96  sync_tatas_acquire(&rt->tloc);
97  {
98  if (!rt->free[rt->level]) {
99  dbg_trace("Request descriptors exhausted for proc %d, max=%u", proc, rt->size);
100  if (__photon_request_grow_table(rt) != PHOTON_OK) {
101  sync_tatas_release(&rt->tloc);
102  return NULL;
103  }
104  }
105 
106  // get our current request buffer level
107  reqs = rt->reqs[rt->level];
108 
109  // find the next free slot
110  rt->next++;
111  rt->next = (rt->next & (rt->size - 1));
112  while (reqs[rt->next].id) {
113  rt->next++;
114  rt->next = (rt->next & (rt->size - 1));
115  }
116  rt->count++;
117  rt->free[rt->level]--;
118 
119  req = &reqs[rt->next];
120  rid = (uint32_t)rt->level<<24;
121  rid |= (uint32_t)((rt->next) + 1);
122 
123  memset(req, 0, sizeof(struct photon_req_t));
124  req->id = PROC_REQUEST_ID(proc, rid);
125  req->state = REQUEST_NEW;
126  req->op = REQUEST_OP_DEFAULT;
127  req->flags = REQUEST_FLAG_NIL;
128  //bit_array_clear_all(req->mmask);
129 
130  dbg_trace("Returning a new request (count=%lu, free=%u) with id: 0x%016lx",
131  rt->count, rt->free[rt->level], req->id);
132  }
133  sync_tatas_release(&rt->tloc);
134 
135  return req;
136 }
137 
139  photonRequest req;
141  uint32_t proc, id;
142  uint16_t level;
143 
144  proc = (uint32_t)(rid>>32);
145  level = (uint16_t)(rid<<32>>56);
146  id = (uint32_t)(rid<<40>>40) - 1;
147 
148  assert(IS_VALID_PROC(proc));
149  rt = photon_processes[proc].request_table;
150  assert(id >= 0 && id < rt->size);
151 
152  sync_tatas_acquire(&rt->tloc);
153  {
154  req = &rt->reqs[level][id];
155  if (req->state == REQUEST_FREE) {
156  dbg_warn("Looking up a request that is freed, op=%d, type=%d, id=0x%016lx",
157  req->op, req->type, rid);
158  }
159  }
160  sync_tatas_release(&rt->tloc);
161 
162  return req;
163 }
164 
165 int photon_count_request(int proc) {
167  int i, j, start, end, count = 0;
168  if (proc == PHOTON_ANY_SOURCE) {
169  start = 0;
170  end = _photon_nproc;
171  }
172  else {
173  start = proc;
174  end = proc+1;
175  }
176  for (i=start; i<end; i++) {
178  sync_tatas_acquire(&rt->tloc);
179  {
180  for (j=(rt->level); j >= 0; j--) {
181  count += ((rt->size >> (rt->level-j)) - rt->free[j]);
182  }
183  }
184  sync_tatas_release(&rt->tloc);
185  }
186  return count;
187 }
188 
191  uint32_t proc = (uint32_t)(req->id>>32);
192  uint16_t level = (uint16_t)(req->id<<32>>56);
193  dbg_trace("Clearing request 0x%016lx", req->id);
194  __photon_cleanup_request(req);
195  rt = photon_processes[proc].request_table;
196  sync_tatas_acquire(&rt->tloc);
197  {
198  if (req->local_info.id.size) {
199  free(req->local_info.id.data);
200  }
201  req->id = NULL_REQUEST;
202  req->state = REQUEST_FREE;
203  rt->free[level]++;
204  }
205  sync_tatas_release(&rt->tloc);
206  return PHOTON_OK;
207 }
208 
209 photonRequest photon_setup_request_direct(photonBuffer lbuf, photonBuffer rbuf, uint64_t size,
210  int proc, int events) {
211  photonRequest req;
212 
213  req = photon_get_request(proc);
214  if (!req) {
215  log_err("Couldn't allocate request");
216  goto error_exit;
217  }
218 
219  dbg_trace("Setting up a direct request: %d/0x%016lx/%p", proc, req->id, req);
220 
221  req->state = REQUEST_PENDING;
222  req->type = EVQUEUE;
223  req->proc = proc;
224  req->size = size;
225  req->tag = 0;
226  req->rattr.events = events;
227  req->rattr.cookie = req->id;
228 
229  if (lbuf) {
230  memcpy(&req->local_info.buf, lbuf, sizeof(*lbuf));
231  req->local_info.id.u64 = 0;
232 
233  dbg_trace("Local info ...");
234  dbg_trace(" Addr: %p", (void *)lbuf->addr);
235  dbg_trace(" Size: %lu", lbuf->size);
236  dbg_trace(" Keys: 0x%016lx / 0x%016lx", lbuf->priv.key0, lbuf->priv.key1);
237  }
238 
239  if (rbuf) {
240  // fill in the internal buffer with the rbuf contents
241  memcpy(&req->remote_info.buf, rbuf, sizeof(*rbuf));
242  // there is no matching request from the remote side, so fill in local values */
243  req->remote_info.id.u64 = 0;
244 
245  dbg_trace("Remote info ...");
246  dbg_trace(" Addr: %p", (void *)rbuf->addr);
247  dbg_trace(" Size: %lu", rbuf->size);
248  dbg_trace(" Keys: 0x%016lx / 0x%016lx", rbuf->priv.key0, rbuf->priv.key1);
249  }
250 
251  return req;
252 
253  error_exit:
254  return NULL;
255 }
256 
258  photonRequest req;
259 
260  req = photon_get_request(proc);
261  if (!req) {
262  log_err("Couldn't allocate request");
263  goto error_exit;
264  }
265 
266  dbg_trace("Setting up a new send buffer request: %d/0x%016lx/%p", proc, req->id, req);
267 
268  req->state = REQUEST_NEW;
269  req->type = EVQUEUE;
270  req->proc = proc;
271  req->flags = ri_entry->flags;
272  req->size = ri_entry->size;
273  req->rattr.events = 1;
274 
275  // save the remote buffer in the request
276  req->remote_info.id.u64 = ri_entry->request;
277  req->remote_info.buf.addr = ri_entry->addr;
278  req->remote_info.buf.size = ri_entry->size;
279  req->remote_info.buf.priv = ri_entry->priv;
280 
281  dbg_trace("Remote request: 0x%016lx", ri_entry->request);
282  dbg_trace("Addr: %p", (void *)ri_entry->addr);
283  dbg_trace("Size: %lu", ri_entry->size);
284  dbg_trace("Tag: %d", ri_entry->tag);
285  dbg_trace("Keys: 0x%016lx / 0x%016lx", ri_entry->priv.key0, ri_entry->priv.key1);
286 
287  // reset the info ledger entry
288  ri_entry->header = 0;
289  ri_entry->footer = 0;
290 
291  return req;
292 
293  error_exit:
294  return NULL;
295 }
296 
298  photonRequest req;
299 
300  req = photon_get_request(proc);
301  if (!req) {
302  log_err("Couldn't allocate request");
303  goto error_exit;
304  }
305 
306  dbg_trace("Setting up a new eager buffer request: %d/0x%016lx/%p", proc, req->id, req);
307 
308  req->state = REQUEST_NEW;
309  req->type = EVQUEUE;
310  req->proc = proc;
311  req->flags = REQUEST_FLAG_EAGER;
312  req->size = (entry->request>>32);
313  req->rattr.events = 1;
314 
315  req->remote_info.buf.size = req->size;
316  req->remote_info.id.u64 = (( (uint64_t)_photon_myrank)<<32) | (entry->request<<32>>32);
317 
318  // reset the info ledger entry
319  entry->request = 0;
320 
321  return req;
322 
323  error_exit:
324  return NULL;
325 }
326 
327 /* generates a new request for the recv wr
328  calling this means we got an event for a corresponding post_recv()
329  we know the recv mbuf entry index
330  we inspected the UD hdr and determined the current sequence number
331  this setup method also returns the request pointer... */
332 photonRequest photon_setup_request_recv(photonAddr addr, int msn, int msize, int bindex, int nbufs) {
333  photonRequest req;
334 
335  req = photon_get_request(addr->global.proc_id);
336  if (!req) {
337  log_err("Couldn't allocate request");
338  goto error_exit;
339  }
340 
341  req->tag = PHOTON_ANY_TAG;
342  req->state = REQUEST_PENDING;
343  req->type = SENDRECV;
344  req->proc = addr->global.proc_id;
345  req->size = msize;
346  req->rattr.events = nbufs;
347  //req->bentries[msn] = bindex;
348  //memcpy(&req->addr, addr, sizeof(*addr));
349 
350  //bit_array_set(req->mmask, msn);
351 
352  return req;
353 
354  error_exit:
355  return NULL;
356 }
357 
359  photonRequest req;
360 
361  req = photon_get_request(addr->global.proc_id);
362  if (!req) {
363  log_err("Couldn't allocate request");
364  goto error_exit;
365  }
366 
367  req->tag = PHOTON_ANY_TAG;
368  req->state = REQUEST_PENDING;
369  req->type = SENDRECV;
370  req->size = 0;
371  req->rattr.events = nbufs;
372  //memcpy(&req->addr, addr, sizeof(*addr));
373  //memcpy(req->bentries, bufs, sizeof(int)*DEF_MAX_BUF_ENTRIES);
374 
375  return req;
376 
377  error_exit:
378  return NULL;
379 }
380 
381 static int __photon_cleanup_request(photonRequest req) {
382  switch (req->op) {
383  case REQUEST_OP_SENDBUF:
384  if (req->flags & REQUEST_FLAG_EAGER) {
385  MARK_DONE(photon_processes[req->proc].remote_eager_buf, req->size);
387  }
388  else {
390  }
391  break;
392  case REQUEST_OP_SENDREQ:
394  break;
395  case REQUEST_OP_SENDFIN:
396  break;
397  case REQUEST_OP_RECVBUF:
399  break;
400  case REQUEST_OP_PWC:
401  if (req->flags & REQUEST_FLAG_1PWC) {
402  assert(req->psize);
403  MARK_DONE(photon_processes[req->proc].remote_pwc_buf, req->psize);
404  }
405  if ((req->flags & REQUEST_FLAG_2PWC) &&
406  !(req->flags & REQUEST_FLAG_NO_RCE)) {
408  }
409  break;
410  case REQUEST_OP_COLL:
411  break;
412  case REQUEST_OP_DEFAULT:
413  break;
414  default:
415  log_err("Tried to cleanup a request op we don't recognize: %d", req->op);
416  break;
417  }
418 
419  return PHOTON_OK;
420 }
421 
422 static int __photon_request_grow_table(photonRequestTable rt) {
423  uint64_t nsize = rt->size << 1;
424 
425  if ((rt->level + 1) >= DEF_NR_LEVELS) {
426  dbg_err("Exceeded max request table buffer allocations: %u", DEF_NR_LEVELS);
427  return PHOTON_ERROR;
428  }
429 
430  if (__photon_config->cap.max_rd && (nsize > __photon_config->cap.max_rd)) {
431  dbg_err("Exceeded max allowable request descriptors: %d",
432  __photon_config->cap.max_rd);
433  return PHOTON_ERROR;
434  }
435 
436  rt->level++;
437 
438  rt->reqs[rt->level] = (photonRequest)calloc(nsize, sizeof(struct photon_req_t));
439  if (!rt->reqs[rt->level]) {
440  dbg_err("Could not increase request table size to %lu", nsize);
441  return PHOTON_ERROR;
442  }
443 
444  rt->size = nsize;
445  rt->free[rt->level] = nsize;
446  rt->next = 0;
447 
448  dbg_trace("Resized request table: %lu (next: %lu)", nsize, rt->next);
449 
450  return PHOTON_OK;
451 }
photonLedger remote_pwc_ledger
photonRequest photon_lookup_request(photon_rid rid)
photonRequest photon_setup_request_ledger_eager(photonRDMALedgerEntry entry, int curr, int proc)
#define PHOTON_ANY_TAG
RNDV flag: return any tags.
Definition: photon.h:53
struct photon_ri_ledger_entry_t * photonRILedgerEntry
#define REQUEST_FLAG_EAGER
#define DEF_NR_LEVELS
int photon_count_request(int proc)
#define REQUEST_OP_RECVBUF
#define EVQUEUE
photonRequest photon_setup_request_direct(photonBuffer lbuf, photonBuffer rbuf, uint64_t size, int proc, int events)
photonLedger remote_eager_ledger
int _photon_nproc
Definition: libphoton.c:55
struct photon_rdma_ledger_entry_t * photonRDMALedgerEntry
photonRequestTable request_table
struct photon_req_table_t * photonRequestTable
uint64_t photon_rid
The Photon request ID.
Definition: photon.h:75
#define IS_VALID_PROC(p)
photonRILedger remote_rcv_info_ledger
#define PHOTON_ANY_SOURCE
RNDV and PWC flag: return completions from any source.
Definition: photon.h:54
int _photon_nforw
Definition: libphoton.c:56
#define REQUEST_FLAG_1PWC
#define REQUEST_OP_SENDFIN
union photon_addr_t * photonAddr
Convenience pointer type for the address union.
Definition: photon.h:112
#define MARK_DONE(e, s)
int _photon_myrank
Definition: libphoton.c:54
photonRequest photon_get_request(int proc)
#define REQUEST_OP_DEFAULT
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define NULL_REQUEST
#define PHOTON_ERROR
Error code, general error.
Definition: photon.h:32
int photon_free_request(photonRequest req)
photonRequest photon_setup_request_recv(photonAddr addr, int msn, int msize, int bindex, int nbufs)
#define REQUEST_FREE
photonEagerBuf remote_pwc_buf
#define PHOTON_OK
Photon success code.
Definition: photon.h:30
#define REQUEST_NEW
#define REQUEST_OP_SENDBUF
photonRequest photon_setup_request_ledger_info(photonRILedgerEntry ri_entry, int curr, int proc)
#define REQUEST_OP_COLL
int photon_request_init(photonConfig cfg)
photonRequest photon_setup_request_send(photonAddr addr, int *bufs, int nbufs)
photonEagerBuf remote_eager_buf
#define REQUEST_FLAG_NO_RCE
photonRILedger remote_snd_info_ledger
#define SENDRECV
#define REQUEST_OP_PWC
photonConfig __photon_config
Definition: libphoton.c:48
#define REQUEST_FLAG_2PWC
#define PROC_REQUEST_ID(p, id)
#define REQUEST_PENDING
#define REQUEST_OP_SENDREQ
#define REQUEST_FLAG_NIL