32 log_err(
"Could not allocate request table for proc %d", i);
39 rt->size = cfg->cap.default_rd;
40 rt->free = (uint32_t*)malloc(
DEF_NR_LEVELS *
sizeof(uint32_t));
41 rt->free[rt->level] = cfg->cap.default_rd;
44 log_err(
"Could not allocate request array for proc %d", i);
47 rt->reqs[rt->level] = (
photonRequest)calloc(cfg->cap.default_rd,
sizeof(
struct photon_req_t));
48 if (!rt->reqs[rt->level]) {
49 log_err(
"Could not allocate request descriptors for proc %d", i);
52 rt->pwc_q = sync_two_lock_queue_new();
54 log_err(
"Could not allocate pwc request queue for proc %d", i);
57 rt->gwc_q = sync_two_lock_queue_new();
59 log_err(
"Could not allocate gwc request queue for proc %d", i);
62 rt->gwc_pwc_q = sync_two_lock_queue_new();
64 log_err(
"Could not allocate gwc-pwc request queue for proc %d", i);
67 rt->comp_q = sync_two_lock_queue_new();
69 log_err(
"Could not allocate pwc completion queue for proc %d", i);
75 sync_tatas_init(&rt->tloc);
76 sync_tatas_init(&rt->pq_loc);
77 sync_tatas_init(&rt->gq_loc);
78 sync_tatas_init(&rt->pack_loc);
96 sync_tatas_acquire(&rt->tloc);
98 if (!rt->free[rt->level]) {
99 dbg_trace(
"Request descriptors exhausted for proc %d, max=%u", proc, rt->size);
100 if (__photon_request_grow_table(rt) !=
PHOTON_OK) {
101 sync_tatas_release(&rt->tloc);
107 reqs = rt->reqs[rt->level];
111 rt->next = (rt->next & (rt->size - 1));
112 while (reqs[rt->next].id) {
114 rt->next = (rt->next & (rt->size - 1));
117 rt->free[rt->level]--;
119 req = &reqs[rt->next];
120 rid = (uint32_t)rt->level<<24;
121 rid |= (uint32_t)((rt->next) + 1);
123 memset(req, 0,
sizeof(
struct photon_req_t));
130 dbg_trace(
"Returning a new request (count=%lu, free=%u) with id: 0x%016lx",
131 rt->count, rt->free[rt->level], req->id);
133 sync_tatas_release(&rt->tloc);
144 proc = (uint32_t)(rid>>32);
145 level = (uint16_t)(rid<<32>>56);
146 id = (uint32_t)(rid<<40>>40) - 1;
150 assert(
id >= 0 && id < rt->size);
152 sync_tatas_acquire(&rt->tloc);
154 req = &rt->reqs[level][id];
156 dbg_warn(
"Looking up a request that is freed, op=%d, type=%d, id=0x%016lx",
157 req->op, req->type, rid);
160 sync_tatas_release(&rt->tloc);
167 int i, j, start, end, count = 0;
176 for (i=start; i<end; i++) {
178 sync_tatas_acquire(&rt->tloc);
180 for (j=(rt->level); j >= 0; j--) {
181 count += ((rt->size >> (rt->level-j)) - rt->free[j]);
184 sync_tatas_release(&rt->tloc);
191 uint32_t proc = (uint32_t)(req->id>>32);
192 uint16_t level = (uint16_t)(req->id<<32>>56);
193 dbg_trace(
"Clearing request 0x%016lx", req->id);
194 __photon_cleanup_request(req);
196 sync_tatas_acquire(&rt->tloc);
198 if (req->local_info.id.size) {
199 free(req->local_info.id.data);
205 sync_tatas_release(&rt->tloc);
210 int proc,
int events) {
215 log_err(
"Couldn't allocate request");
219 dbg_trace(
"Setting up a direct request: %d/0x%016lx/%p", proc, req->id, req);
226 req->rattr.events = events;
227 req->rattr.cookie = req->id;
230 memcpy(&req->local_info.buf, lbuf,
sizeof(*lbuf));
231 req->local_info.id.u64 = 0;
233 dbg_trace(
"Local info ...");
234 dbg_trace(
" Addr: %p", (
void *)lbuf->addr);
235 dbg_trace(
" Size: %lu", lbuf->size);
236 dbg_trace(
" Keys: 0x%016lx / 0x%016lx", lbuf->priv.key0, lbuf->priv.key1);
241 memcpy(&req->remote_info.buf, rbuf,
sizeof(*rbuf));
243 req->remote_info.id.u64 = 0;
245 dbg_trace(
"Remote info ...");
246 dbg_trace(
" Addr: %p", (
void *)rbuf->addr);
247 dbg_trace(
" Size: %lu", rbuf->size);
248 dbg_trace(
" Keys: 0x%016lx / 0x%016lx", rbuf->priv.key0, rbuf->priv.key1);
262 log_err(
"Couldn't allocate request");
266 dbg_trace(
"Setting up a new send buffer request: %d/0x%016lx/%p", proc, req->id, req);
271 req->flags = ri_entry->flags;
272 req->size = ri_entry->size;
273 req->rattr.events = 1;
276 req->remote_info.id.u64 = ri_entry->request;
277 req->remote_info.buf.addr = ri_entry->addr;
278 req->remote_info.buf.size = ri_entry->size;
279 req->remote_info.buf.priv = ri_entry->priv;
281 dbg_trace(
"Remote request: 0x%016lx", ri_entry->request);
282 dbg_trace(
"Addr: %p", (
void *)ri_entry->addr);
283 dbg_trace(
"Size: %lu", ri_entry->size);
284 dbg_trace(
"Tag: %d", ri_entry->tag);
285 dbg_trace(
"Keys: 0x%016lx / 0x%016lx", ri_entry->priv.key0, ri_entry->priv.key1);
288 ri_entry->header = 0;
289 ri_entry->footer = 0;
302 log_err(
"Couldn't allocate request");
306 dbg_trace(
"Setting up a new eager buffer request: %d/0x%016lx/%p", proc, req->id, req);
312 req->size = (entry->request>>32);
313 req->rattr.events = 1;
315 req->remote_info.buf.size = req->size;
316 req->remote_info.id.u64 = (( (uint64_t)
_photon_myrank)<<32) | (entry->request<<32>>32);
337 log_err(
"Couldn't allocate request");
344 req->proc = addr->global.proc_id;
346 req->rattr.events = nbufs;
363 log_err(
"Couldn't allocate request");
371 req->rattr.events = nbufs;
415 log_err(
"Tried to cleanup a request op we don't recognize: %d", req->op);
423 uint64_t nsize = rt->size << 1;
426 dbg_err(
"Exceeded max request table buffer allocations: %u",
DEF_NR_LEVELS);
431 dbg_err(
"Exceeded max allowable request descriptors: %d",
438 rt->reqs[rt->level] = (
photonRequest)calloc(nsize,
sizeof(
struct photon_req_t));
439 if (!rt->reqs[rt->level]) {
440 dbg_err(
"Could not increase request table size to %lu", nsize);
445 rt->free[rt->level] = nsize;
448 dbg_trace(
"Resized request table: %lu (next: %lu)", nsize, rt->next);
photonLedger remote_pwc_ledger
photonRequest photon_lookup_request(photon_rid rid)
photonRequest photon_setup_request_ledger_eager(photonRDMALedgerEntry entry, int curr, int proc)
#define PHOTON_ANY_TAG
RNDV flag: return any tags.
struct photon_ri_ledger_entry_t * photonRILedgerEntry
#define REQUEST_FLAG_EAGER
int photon_count_request(int proc)
#define REQUEST_OP_RECVBUF
photonRequest photon_setup_request_direct(photonBuffer lbuf, photonBuffer rbuf, uint64_t size, int proc, int events)
photonLedger remote_eager_ledger
struct photon_rdma_ledger_entry_t * photonRDMALedgerEntry
photonRequestTable request_table
struct photon_req_table_t * photonRequestTable
uint64_t photon_rid
The Photon request ID.
photonRILedger remote_rcv_info_ledger
#define PHOTON_ANY_SOURCE
RNDV and PWC flag: return completions from any source.
#define REQUEST_FLAG_1PWC
#define REQUEST_OP_SENDFIN
union photon_addr_t * photonAddr
Convenience pointer type for the address union.
photonRequest photon_get_request(int proc)
#define REQUEST_OP_DEFAULT
struct photon_req_t * photonRequest
ProcessInfo * photon_processes
#define PHOTON_ERROR
Error code, general error.
int photon_free_request(photonRequest req)
photonRequest photon_setup_request_recv(photonAddr addr, int msn, int msize, int bindex, int nbufs)
photonEagerBuf remote_pwc_buf
#define PHOTON_OK
Photon success code.
#define REQUEST_OP_SENDBUF
photonRequest photon_setup_request_ledger_info(photonRILedgerEntry ri_entry, int curr, int proc)
int photon_request_init(photonConfig cfg)
photonRequest photon_setup_request_send(photonAddr addr, int *bufs, int nbufs)
photonEagerBuf remote_eager_buf
#define REQUEST_FLAG_NO_RCE
photonRILedger remote_snd_info_ledger
photonConfig __photon_config
#define REQUEST_FLAG_2PWC
#define PROC_REQUEST_ID(p, id)
#define REQUEST_OP_SENDREQ