Newer
Older
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), flags);
if (req == NULL)
req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
if (!req->rl_sendbuf)
goto out2;
req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
if (!req->rl_recvbuf)
goto out3;
INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_lock);
return req;
kfree(req->rl_sendbuf);
out2:
kfree(req);
out1:
return NULL;
}
* rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
* @r_xprt: controlling transport instance
* @req: rpcrdma_req object to set up
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
* Returns zero on success, and a negative errno on failure.
*/
int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct rpcrdma_regbuf *rb;
size_t maxhdrsize;
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE, GFP_KERNEL);
if (!rb)
goto out;
if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_free;
req->rl_rdmabuf = rb;
xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
return 0;
out_free:
rpcrdma_regbuf_free(rb);
out:
return -ENOMEM;
}
/* ASSUMPTION: the rb_allreqs list is stable for the duration,
* and thus can be walked without holding rb_lock. Eg. the
* caller is holding the transport send lock to exclude
* device removal or disconnection.
*/
static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
int rc;
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
rc = rpcrdma_req_setup(r_xprt, req);
if (rc)
return rc;
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
return 0;
}
static void rpcrdma_req_reset(struct rpcrdma_req *req)
{
/* Credits are valid for only one connection */
req->rl_slot.rq_cong = 0;
rpcrdma_regbuf_free(req->rl_rdmabuf);
req->rl_rdmabuf = NULL;
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
/* ASSUMPTION: the rb_allreqs list is stable for the duration,
* and thus can be walked without holding rb_lock. Eg. the
* caller is holding the transport send lock to exclude
* device removal or disconnection.
*/
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
list_for_each_entry(req, &buf->rb_allreqs, rl_all)
rpcrdma_req_reset(req);
static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
bool temp)
{
struct rpcrdma_rep *rep;
rep = kzalloc(sizeof(*rep), GFP_KERNEL);
if (rep == NULL)
goto out;
rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
goto out_free;
xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf));
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
rep->rr_recv_wr.num_sge = 1;
list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
out_free:
kfree(rep);
out:
}
static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
list_del(&rep->rr_all);
rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
{
struct llist_node *node;
/* Calls to llist_del_first are required to be serialized */
node = llist_del_first(&buf->rb_free_reps);
if (!node)
return NULL;
return llist_entry(node, struct rpcrdma_rep, rr_node);
}
static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
struct rpcrdma_rep *rep)
{
llist_add(&rep->rr_node, &buf->rb_free_reps);
}
static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
list_for_each_entry(rep, &buf->rb_all_reps, rr_all)
rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
}
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_rep *rep;
while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
rpcrdma_rep_destroy(rep);
}
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
*
* Returns zero on success, otherwise a negative errno.
*/
int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
INIT_LIST_HEAD(&buf->rb_all_reps);
for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
struct rpcrdma_req *req;
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
list_add(&req->rl_list, &buf->rb_send_bufs);
init_llist_head(&buf->rb_free_reps);
return 0;
out:
rpcrdma_buffer_destroy(buf);
return rc;
}
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
*
* Relies on caller holding the transport send lock to protect
* removing req->rl_all from buf->rb_all_reqs safely.
void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);
while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
spin_lock(&buf->rb_lock);
list_del(&mr->mr_all);
spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
}
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
kfree(req);
}
/**
* rpcrdma_mrs_destroy - Release all of a transport's MRs
* @r_xprt: controlling transport instance
*
* Relies on caller holding the transport send lock to protect
* removing mr->mr_list from req->rl_free_mrs safely.
*/
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
cancel_work_sync(&buf->rb_refresh_worker);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
/**
* rpcrdma_buffer_destroy - Release all hw resources
* @buf: root control block for resources
*
* ORDERING: relies on a prior rpcrdma_xprt_drain :
* - No more Send or Receive completions can occur
* - All MRs, reps, and reqs are returned to their free lists
*/
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) {
req = list_first_entry(&buf->rb_send_bufs,
struct rpcrdma_req, rl_list);
list_del(&req->rl_list);
rpcrdma_req_destroy(req);
/**
* rpcrdma_mr_get - Allocate an rpcrdma_mr object
* @r_xprt: controlling transport
*
* Returns an initialized rpcrdma_mr or NULL if no free
* rpcrdma_mr objects are available.
*/
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
* rpcrdma_mr_put - DMA unmap an MR and release it
* @mr: MR to release
void rpcrdma_mr_put(struct rpcrdma_mr *mr)
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
}
/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
* Returns a fresh rpcrdma_req, or NULL if none are available.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
spin_lock(&buffers->rb_lock);
req = list_first_entry_or_null(&buffers->rb_send_bufs,
struct rpcrdma_req, rl_list);
if (req)
list_del_init(&req->rl_list);
spin_unlock(&buffers->rb_lock);
/**
* rpcrdma_buffer_put - Put request/reply buffers back into pool
* @req: object to return
*
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
if (req->rl_reply)
rpcrdma_rep_put(buffers, req->rl_reply);
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
spin_unlock(&buffers->rb_lock);
/**
* rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
* @rep: rep to release
*
* Used after error conditions.
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
* or Replies they may be registered externally via frwr_map.
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
rb = kmalloc(sizeof(*rb), flags);
if (!rb)
return NULL;
rb->rg_data = kmalloc(size, flags);
if (!rb->rg_data) {
kfree(rb);
return NULL;
}
rb->rg_device = NULL;
rb->rg_iov.length = size;
/**
* rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
* @rb: regbuf to reallocate
* @size: size of buffer to be allocated, in bytes
* @flags: GFP flags
*
* Returns true if reallocation was successful. If false is
* returned, @rb is left untouched.
*/
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{
void *buf;
buf = kmalloc(size, flags);
if (!buf)
return false;
kfree(rb->rg_data);
rb->rg_data = buf;
rb->rg_iov.length = size;
return true;
}
* __rpcrdma_regbuf_dma_map - DMA-map a regbuf
* @r_xprt: controlling transport instance
* @rb: regbuf to be mapped
*
* Returns true if the buffer is now DMA mapped to @r_xprt's device
bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_regbuf *rb)
struct ib_device *device = r_xprt->rx_ia.ri_id->device;
if (rb->rg_direction == DMA_NONE)
return false;
rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
rdmab_length(rb), rb->rg_direction);
if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
trace_xprtrdma_dma_maperr(rdmab_addr(rb));
rb->rg_device = device;
rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
return true;
}
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
if (!rpcrdma_regbuf_is_mapped(rb))
return;
ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
rb->rg_direction);
rb->rg_device = NULL;
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
/**
* rpcrdma_ep_post - Post WRs to a transport's Send Queue
* @ia: transport's device information
* @ep: transport's RDMA endpoint information
* @req: rpcrdma_req containing the Send WR to post
* Returns 0 if the post was successful, otherwise -ENOTCONN
* is returned.
*/
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_wr;
if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch;
} else {
send_wr->send_flags &= ~IB_SEND_SIGNALED;
--ep->rep_send_count;
}
trace_xprtrdma_post_send(req, rc);
/**
* rpcrdma_post_recvs - Refill the Receive Queue
* @r_xprt: controlling transport instance
* @temp: mark Receive buffers to be deleted after use
*
*/
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct ib_recv_wr *i, *wr, *bad_wr;
struct rpcrdma_rep *rep;
rc = 0;
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (likely(ep->rep_receive_count > needed))
if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH;
/* fast path: all needed reps can be found on the free list */
wr = NULL;
while (needed) {
rep = rpcrdma_rep_get_locked(buf);
if (rep && rep->rr_temp) {
rpcrdma_rep_destroy(rep);
continue;
}
rep = rpcrdma_rep_create(r_xprt, temp);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
--needed;
}
for (i = wr; i; i = i->next) {
rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
goto release_wrs;
trace_xprtrdma_post_recv(rep);
Bart Van Assche
committed
rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
out:
trace_xprtrdma_post_recvs(r_xprt, count, rc);
for (wr = bad_wr; wr;) {
struct rpcrdma_rep *rep;
rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
rpcrdma_recv_buffer_put(rep);
--count;
}
}
return;
release_wrs:
for (i = wr; i;) {
rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
i = i->next;
rpcrdma_recv_buffer_put(rep);
}