Newer
Older
* @rep: rep to release
*
*/
void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
llist_add(&rep->rr_node, &buf->rb_free_reps);
}
/* Caller must ensure the QP is quiescent (RQ is drained) before
* invoking this function, to guarantee rb_all_reps is not
* changing.
*/
static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
rep->rr_temp = true; /* Mark this rep for destruction */
}
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_rep *rep;
spin_lock(&buf->rb_lock);
while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
struct rpcrdma_rep,
rr_all)) != NULL) {
list_del(&rep->rr_all);
spin_unlock(&buf->rb_lock);
rpcrdma_rep_free(rep);
spin_lock(&buf->rb_lock);
}
spin_unlock(&buf->rb_lock);
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
*
* Returns zero on success, otherwise a negative errno.
*/
int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
INIT_LIST_HEAD(&buf->rb_all_reps);
for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
struct rpcrdma_req *req;
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
list_add(&req->rl_list, &buf->rb_send_bufs);
init_llist_head(&buf->rb_free_reps);
return 0;
out:
rpcrdma_buffer_destroy(buf);
return rc;
}
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
*
* Relies on caller holding the transport send lock to protect
* removing req->rl_all from buf->rb_all_reqs safely.
void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);
while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
spin_lock(&buf->rb_lock);
list_del(&mr->mr_all);
spin_unlock(&buf->rb_lock);
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
kfree(req);
}
/**
* rpcrdma_mrs_destroy - Release all of a transport's MRs
* @r_xprt: controlling transport instance
*
* Relies on caller holding the transport send lock to protect
* removing mr->mr_list from req->rl_free_mrs safely.
*/
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
cancel_work_sync(&buf->rb_refresh_worker);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
/**
* rpcrdma_buffer_destroy - Release all hw resources
* @buf: root control block for resources
*
* ORDERING: relies on a prior rpcrdma_xprt_drain :
* - No more Send or Receive completions can occur
* - All MRs, reps, and reqs are returned to their free lists
*/
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) {
req = list_first_entry(&buf->rb_send_bufs,
struct rpcrdma_req, rl_list);
list_del(&req->rl_list);
rpcrdma_req_destroy(req);
/**
* rpcrdma_mr_get - Allocate an rpcrdma_mr object
* @r_xprt: controlling transport
*
* Returns an initialized rpcrdma_mr or NULL if no free
* rpcrdma_mr objects are available.
*/
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
/**
* rpcrdma_reply_put - Put reply buffers back into pool
* @buffers: buffer pool
* @req: object to return
*
*/
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
if (req->rl_reply) {
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
}
}
/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
* Returns a fresh rpcrdma_req, or NULL if none are available.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
spin_lock(&buffers->rb_lock);
req = list_first_entry_or_null(&buffers->rb_send_bufs,
struct rpcrdma_req, rl_list);
if (req)
list_del_init(&req->rl_list);
spin_unlock(&buffers->rb_lock);
/**
* rpcrdma_buffer_put - Put request/reply buffers back into pool
* @req: object to return
*
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
rpcrdma_reply_put(buffers, req);
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
spin_unlock(&buffers->rb_lock);
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
* or Replies they may be registered externally via frwr_map.
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
rb = kmalloc(sizeof(*rb), flags);
if (!rb)
return NULL;
rb->rg_data = kmalloc(size, flags);
if (!rb->rg_data) {
kfree(rb);
return NULL;
}
rb->rg_device = NULL;
rb->rg_iov.length = size;
/**
* rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
* @rb: regbuf to reallocate
* @size: size of buffer to be allocated, in bytes
* @flags: GFP flags
*
* Returns true if reallocation was successful. If false is
* returned, @rb is left untouched.
*/
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{
void *buf;
buf = kmalloc(size, flags);
if (!buf)
return false;
kfree(rb->rg_data);
rb->rg_data = buf;
rb->rg_iov.length = size;
return true;
}
* __rpcrdma_regbuf_dma_map - DMA-map a regbuf
* @r_xprt: controlling transport instance
* @rb: regbuf to be mapped
*
* Returns true if the buffer is now DMA mapped to @r_xprt's device
bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_regbuf *rb)
struct ib_device *device = r_xprt->rx_ep->re_id->device;
if (rb->rg_direction == DMA_NONE)
return false;
rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
rdmab_length(rb), rb->rg_direction);
if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
trace_xprtrdma_dma_maperr(rdmab_addr(rb));
rb->rg_device = device;
rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
return true;
}
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
if (!rpcrdma_regbuf_is_mapped(rb))
return;
ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
rb->rg_direction);
rb->rg_device = NULL;
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
/**
* rpcrdma_post_recvs - Refill the Receive Queue
* @r_xprt: controlling transport instance
* @needed: current credit grant
* @temp: mark Receive buffers to be deleted after one use
*
*/
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = r_xprt->rx_ep;
struct ib_recv_wr *wr, *bad_wr;
rc = 0;
count = 0;
if (likely(ep->re_receive_count > needed))
needed -= ep->re_receive_count;
if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH;
if (atomic_inc_return(&ep->re_receiving) > 1)
goto out;
/* fast path: all needed reps can be found on the free list */
wr = NULL;
while (needed) {
rep = rpcrdma_rep_get_locked(buf);
if (rep && rep->rr_temp) {
rpcrdma_rep_destroy(rep);
continue;
}
rep = rpcrdma_rep_create(r_xprt, temp);
rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
trace_xprtrdma_post_recv(rep);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
--needed;
rc = ib_post_recv(ep->re_id->qp, wr,
Bart Van Assche
committed
(const struct ib_recv_wr **)&bad_wr);
trace_xprtrdma_post_recvs_err(r_xprt, rc);
for (wr = bad_wr; wr;) {
struct rpcrdma_rep *rep;
rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
if (atomic_dec_return(&ep->re_receiving) > 0)
complete(&ep->re_done);
out:
trace_xprtrdma_post_recvs(r_xprt, count);
ep->re_receive_count += count;