Newer
Older
rep = list_first_entry(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
return rep;
}
static void
rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
{
rpcrdma_free_regbuf(rep->rr_rdmabuf);
kfree(rep);
}
rpcrdma_destroy_req(struct rpcrdma_req *req)
{
rpcrdma_free_regbuf(req->rl_recvbuf);
rpcrdma_free_regbuf(req->rl_sendbuf);
rpcrdma_free_regbuf(req->rl_rdmabuf);
kfree(req);
}
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
static void
rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
{
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
struct rpcrdma_mw *mw;
unsigned int count;
count = 0;
spin_lock(&buf->rb_mwlock);
while (!list_empty(&buf->rb_all)) {
mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&mw->mw_all);
spin_unlock(&buf->rb_mwlock);
ia->ri_ops->ro_release_mr(mw);
count++;
spin_lock(&buf->rb_mwlock);
}
spin_unlock(&buf->rb_mwlock);
r_xprt->rx_stats.mrs_allocated = 0;
dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_recovery_worker);
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
rep = rpcrdma_buffer_get_rep_locked(buf);
rpcrdma_destroy_rep(rep);
spin_lock(&buf->rb_reqslock);
while (!list_empty(&buf->rb_allreqs)) {
req = list_first_entry(&buf->rb_allreqs,
struct rpcrdma_req, rl_all);
list_del(&req->rl_all);
spin_unlock(&buf->rb_reqslock);
rpcrdma_destroy_req(req);
spin_lock(&buf->rb_reqslock);
spin_unlock(&buf->rb_reqslock);
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mw *mw = NULL;
if (!list_empty(&buf->rb_mws)) {
mw = list_first_entry(&buf->rb_mws,
struct rpcrdma_mw, mw_list);
list_del_init(&mw->mw_list);
out_nomws:
dprintk("RPC: %s: no MWs available\n", __func__);
schedule_delayed_work(&buf->rb_refresh_worker, 0);
/* Allow the reply handler and refresh worker to run */
cond_resched();
return NULL;
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
list_add_tail(&mw->mw_list, &buf->rb_mws);
static struct rpcrdma_rep *
rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
{
/* If an RPC previously completed without a reply (say, a
* credential problem or a soft timeout occurs) then hold off
* on supplying more Receive buffers until the number of new
* pending RPCs catches up to the number of posted Receives.
*/
if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
return NULL;
if (unlikely(list_empty(&buffers->rb_recv_bufs)))
return NULL;
buffers->rb_recv_count++;
return rpcrdma_buffer_get_rep_locked(buffers);
}
/*
* Get a set of request/reply buffers.
*
* Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_send_bufs))
goto out_reqbuf;
req = rpcrdma_buffer_get_req_locked(buffers);
req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of request buffers\n", __func__);
}
/*
* Put request/reply buffers back into pool.
* Pre-decrement counter/array index.
*/
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;
req->rl_send_wr.num_sge = 0;
spin_lock(&buffers->rb_lock);
list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
if (rep) {
buffers->rb_recv_count--;
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
}
/*
* Recover reply buffers from pool.
* This happens when recovering from disconnect.
*/
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
spin_lock(&buffers->rb_lock);
req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
}
/*
* Put reply buffers back into pool when not attached to
* request. This happens in error conditions.
*/
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
spin_lock(&buffers->rb_lock);
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
* rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
* @size: size of buffer to be allocated, in bytes
* @direction: direction of data movement
* @flags: GFP flags
*
* Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
* can be persistently DMA-mapped for I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
* or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
rb = kmalloc(sizeof(*rb) + size, flags);
if (rb == NULL)
return ERR_PTR(-ENOMEM);
rb->rg_device = NULL;
rb->rg_iov.length = size;
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
/**
* __rpcrdma_map_regbuf - DMA-map a regbuf
* @ia: controlling rpcrdma_ia
* @rb: regbuf to be mapped
*/
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
if (rb->rg_direction == DMA_NONE)
return false;
rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
(void *)rb->rg_base,
rdmab_length(rb),
rb->rg_direction);
if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
return false;
rb->rg_device = ia->ri_device;
rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
return true;
}
static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
if (!rpcrdma_regbuf_is_mapped(rb))
return;
ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
rdmab_length(rb), rb->rg_direction);
rb->rg_device = NULL;
}
/**
* rpcrdma_free_regbuf - deregister and free registered buffer
* @rb: regbuf to be deregistered and freed
*/
void
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
if (!rb)
return;
rpcrdma_dma_unmap_regbuf(rb);
/*
* Prepost any receive buffer, then post send.
*
* Receive buffer is donated to hardware, reclaimed upon recv completion.
*/
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr_fail;
if (req->rl_reply) {
rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
req->rl_reply = NULL;
}
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);
rpcrdma_set_signaled(ep, send_wr);
rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
goto out_postsend_err;
return 0;
out_postsend_err:
pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
return -ENOTCONN;
}
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
struct rpcrdma_rep *rep)
{
struct ib_recv_wr *recv_wr_fail;
if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
goto out_map;
rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
goto out_postrecv;
return 0;
out_map:
pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
return -EIO;
out_postrecv:
pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
return -ENOTCONN;
/**
* rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
* @r_xprt: transport associated with these backchannel resources
* @min_reqs: minimum number of incoming requests expected
*
* Returns zero if all requested buffers were posted, or a negative errno.
*/
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_rep *rep;
int rc;
while (count--) {
spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_recv_bufs))
goto out_reqbuf;
rep = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock(&buffers->rb_lock);
rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
goto out_rc;
}
return 0;
out_reqbuf:
spin_unlock(&buffers->rb_lock);
pr_warn("%s: no extra receive buffers\n", __func__);
return -ENOMEM;
out_rc:
rpcrdma_recv_buffer_put(rep);
return rc;
}