Newer
Older
list_del(&mw->mw_all);
spin_unlock(&buf->rb_mwlock);
ia->ri_ops->ro_release_mr(mw);
count++;
spin_lock(&buf->rb_mwlock);
}
spin_unlock(&buf->rb_mwlock);
r_xprt->rx_stats.mrs_allocated = 0;
dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_recovery_worker);
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
rep = rpcrdma_buffer_get_rep_locked(buf);
rpcrdma_destroy_rep(rep);
spin_lock(&buf->rb_reqslock);
while (!list_empty(&buf->rb_allreqs)) {
req = list_first_entry(&buf->rb_allreqs,
struct rpcrdma_req, rl_all);
list_del(&req->rl_all);
spin_unlock(&buf->rb_reqslock);
rpcrdma_destroy_req(req);
spin_lock(&buf->rb_reqslock);
spin_unlock(&buf->rb_reqslock);
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mw *mw = NULL;
if (!list_empty(&buf->rb_mws)) {
mw = list_first_entry(&buf->rb_mws,
struct rpcrdma_mw, mw_list);
list_del_init(&mw->mw_list);
out_nomws:
dprintk("RPC: %s: no MWs available\n", __func__);
schedule_delayed_work(&buf->rb_refresh_worker, 0);
/* Allow the reply handler and refresh worker to run */
cond_resched();
return NULL;
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
list_add_tail(&mw->mw_list, &buf->rb_mws);
static struct rpcrdma_rep *
rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
{
/* If an RPC previously completed without a reply (say, a
* credential problem or a soft timeout occurs) then hold off
* on supplying more Receive buffers until the number of new
* pending RPCs catches up to the number of posted Receives.
*/
if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
return NULL;
if (unlikely(list_empty(&buffers->rb_recv_bufs)))
return NULL;
buffers->rb_recv_count++;
return rpcrdma_buffer_get_rep_locked(buffers);
}
/*
* Get a set of request/reply buffers.
*
* Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_send_bufs))
goto out_reqbuf;
req = rpcrdma_buffer_get_req_locked(buffers);
req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of request buffers\n", __func__);
}
/*
* Put request/reply buffers back into pool.
* Pre-decrement counter/array index.
*/
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;
req->rl_send_wr.num_sge = 0;
spin_lock(&buffers->rb_lock);
list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
if (rep) {
buffers->rb_recv_count--;
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
}
/*
* Recover reply buffers from pool.
* This happens when recovering from disconnect.
*/
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
spin_lock(&buffers->rb_lock);
req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
}
/*
* Put reply buffers back into pool when not attached to
* request. This happens in error conditions.
*/
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
spin_lock(&buffers->rb_lock);
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
* rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
* @size: size of buffer to be allocated, in bytes
* @direction: direction of data movement
* @flags: GFP flags
*
* Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
* can be persistently DMA-mapped for I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
* or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
rb = kmalloc(sizeof(*rb) + size, flags);
if (rb == NULL)
return ERR_PTR(-ENOMEM);
rb->rg_device = NULL;
rb->rg_iov.length = size;
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
/**
* __rpcrdma_map_regbuf - DMA-map a regbuf
* @ia: controlling rpcrdma_ia
* @rb: regbuf to be mapped
*/
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
if (rb->rg_direction == DMA_NONE)
return false;
rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
(void *)rb->rg_base,
rdmab_length(rb),
rb->rg_direction);
if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
return false;
rb->rg_device = ia->ri_device;
rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
return true;
}
static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
if (!rpcrdma_regbuf_is_mapped(rb))
return;
ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
rdmab_length(rb), rb->rg_direction);
rb->rg_device = NULL;
}
/**
* rpcrdma_free_regbuf - deregister and free registered buffer
* @rb: regbuf to be deregistered and freed
*/
void
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
if (!rb)
return;
rpcrdma_dma_unmap_regbuf(rb);
/*
* Prepost any receive buffer, then post send.
*
* Receive buffer is donated to hardware, reclaimed upon recv completion.
*/
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr_fail;
struct ib_sge *sge = req->rl_send_iov;
if (req->rl_reply) {
rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
req->rl_reply = NULL;
}
for (i = 0; i < send_wr->num_sge; i++)
ib_dma_sync_single_for_device(device, sge[i].addr,
sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);
if (DECR_CQCOUNT(ep) > 0)
else { /* Provider must take a send completion every now and then */
INIT_CQCOUNT(ep);
send_wr->send_flags = IB_SEND_SIGNALED;
rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
goto out_postsend_err;
return 0;
out_postsend_err:
pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
return -ENOTCONN;
}
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
struct rpcrdma_rep *rep)
{
struct ib_recv_wr recv_wr, *recv_wr_fail;
int rc;
recv_wr.next = NULL;
recv_wr.wr_cqe = &rep->rr_cqe;
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
goto out_map;
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
return 0;
out_map:
pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
return -EIO;
out_postrecv:
pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
return -ENOTCONN;
/**
* rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
* @r_xprt: transport associated with these backchannel resources
* @min_reqs: minimum number of incoming requests expected
*
* Returns zero if all requested buffers were posted, or a negative errno.
*/
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_rep *rep;
int rc;
while (count--) {
spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_recv_bufs))
goto out_reqbuf;
rep = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock(&buffers->rb_lock);
rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
goto out_rc;
}
return 0;
out_reqbuf:
spin_unlock(&buffers->rb_lock);
pr_warn("%s: no extra receive buffers\n", __func__);
return -ENOMEM;
out_rc:
rpcrdma_recv_buffer_put(rep);
return rc;
}