Skip to content
verbs.c 37.6 KiB
Newer Older
out_free_regbuf:
	rpcrdma_regbuf_free(rep->rr_rdmabuf);
	return NULL;
static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
{
	rpcrdma_regbuf_free(rep->rr_rdmabuf);
	kfree(rep);
}

static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
	struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;

	spin_lock(&buf->rb_lock);
	list_del(&rep->rr_all);
	spin_unlock(&buf->rb_lock);

	rpcrdma_rep_free(rep);
}

static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
{
	struct llist_node *node;

	/* Calls to llist_del_first are required to be serialized */
	node = llist_del_first(&buf->rb_free_reps);
	if (!node)
		return NULL;
	return llist_entry(node, struct rpcrdma_rep, rr_node);
}

static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
			    struct rpcrdma_rep *rep)
{
	llist_add(&rep->rr_node, &buf->rb_free_reps);
}

/* Caller must ensure the QP is quiescent (RQ is drained) before
 * invoking this function, to guarantee rb_all_reps is not
 * changing.
 */
static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_rep *rep;

	list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
		rep->rr_temp = true;	/* Mark this rep for destruction */
}

static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	spin_lock(&buf->rb_lock);
	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
					       struct rpcrdma_rep,
					       rr_all)) != NULL) {
		list_del(&rep->rr_all);
		spin_unlock(&buf->rb_lock);

		rpcrdma_rep_free(rep);

		spin_lock(&buf->rb_lock);
	}
	spin_unlock(&buf->rb_lock);
/**
 * rpcrdma_buffer_create - Create initial set of req/rep objects
 * @r_xprt: transport instance to (re)initialize
 *
 * Returns zero on success, otherwise a negative errno.
 */
int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	buf->rb_bc_srv_max_requests = 0;
	spin_lock_init(&buf->rb_lock);
	INIT_LIST_HEAD(&buf->rb_mrs);
	INIT_LIST_HEAD(&buf->rb_all_mrs);
	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
	INIT_LIST_HEAD(&buf->rb_send_bufs);
	INIT_LIST_HEAD(&buf->rb_allreqs);
	INIT_LIST_HEAD(&buf->rb_all_reps);
	for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
Chuck Lever's avatar
Chuck Lever committed
		req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
		list_add(&req->rl_list, &buf->rb_send_bufs);
	init_llist_head(&buf->rb_free_reps);
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

/**
 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
 * @req: unused object to be destroyed
 *
 * Relies on caller holding the transport send lock to protect
 * removing req->rl_all from buf->rb_all_reqs safely.
void rpcrdma_req_destroy(struct rpcrdma_req *req)
	struct rpcrdma_mr *mr;

	while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
		struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;

		spin_lock(&buf->rb_lock);
		list_del(&mr->mr_all);
		spin_unlock(&buf->rb_lock);

		frwr_release_mr(mr);
	}
	rpcrdma_regbuf_free(req->rl_recvbuf);
	rpcrdma_regbuf_free(req->rl_sendbuf);
	rpcrdma_regbuf_free(req->rl_rdmabuf);
/**
 * rpcrdma_mrs_destroy - Release all of a transport's MRs
 * @r_xprt: controlling transport instance
 *
 * Relies on caller holding the transport send lock to protect
 * removing mr->mr_list from req->rl_free_mrs safely.
 */
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mr *mr;
	cancel_work_sync(&buf->rb_refresh_worker);

	spin_lock(&buf->rb_lock);
	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
					      struct rpcrdma_mr,
					      mr_all)) != NULL) {
		list_del(&mr->mr_list);
		list_del(&mr->mr_all);
		spin_unlock(&buf->rb_lock);
		frwr_release_mr(mr);
		spin_lock(&buf->rb_lock);
	spin_unlock(&buf->rb_lock);
/**
 * rpcrdma_buffer_destroy - Release all hw resources
 * @buf: root control block for resources
 *
 * ORDERING: relies on a prior rpcrdma_xprt_drain :
 * - No more Send or Receive completions can occur
 * - All MRs, reps, and reqs are returned to their free lists
 */
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	rpcrdma_reps_destroy(buf);
	while (!list_empty(&buf->rb_send_bufs)) {
		struct rpcrdma_req *req;
		req = list_first_entry(&buf->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
		list_del(&req->rl_list);
		rpcrdma_req_destroy(req);
/**
 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
 * @r_xprt: controlling transport
 *
 * Returns an initialized rpcrdma_mr or NULL if no free
 * rpcrdma_mr objects are available.
 */
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mr *mr;
	spin_lock(&buf->rb_lock);
	mr = rpcrdma_mr_pop(&buf->rb_mrs);
	spin_unlock(&buf->rb_lock);
	return mr;
/**
 * rpcrdma_buffer_get - Get a request buffer
 * @buffers: Buffer pool from which to obtain a buffer
 * Returns a fresh rpcrdma_req, or NULL if none are available.
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
	spin_lock(&buffers->rb_lock);
	req = list_first_entry_or_null(&buffers->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
	if (req)
		list_del_init(&req->rl_list);
	spin_unlock(&buffers->rb_lock);
/**
 * rpcrdma_buffer_put - Put request/reply buffers back into pool
 * @buffers: buffer pool
 * @req: object to return
 *
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
	if (req->rl_reply)
		rpcrdma_rep_put(buffers, req->rl_reply);
	req->rl_reply = NULL;

	spin_lock(&buffers->rb_lock);
	list_add(&req->rl_list, &buffers->rb_send_bufs);
	spin_unlock(&buffers->rb_lock);
/**
 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
 * @rep: rep to release
 *
 * Used after error conditions.
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
	rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via frwr_map.
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb), flags);
	if (!rb)
		return NULL;
	rb->rg_data = kmalloc(size, flags);
	if (!rb->rg_data) {
		kfree(rb);
		return NULL;
	}
	rb->rg_device = NULL;
	rb->rg_direction = direction;
	rb->rg_iov.length = size;
/**
 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
 * @rb: regbuf to reallocate
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns true if reallocation was successful. If false is
 * returned, @rb is left untouched.
 */
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{
	void *buf;

	buf = kmalloc(size, flags);
	if (!buf)
		return false;

	rpcrdma_regbuf_dma_unmap(rb);
	kfree(rb->rg_data);

	rb->rg_data = buf;
	rb->rg_iov.length = size;
	return true;
}

 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
 * @r_xprt: controlling transport instance
 * @rb: regbuf to be mapped
 *
 * Returns true if the buffer is now DMA mapped to @r_xprt's device
bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
			      struct rpcrdma_regbuf *rb)
	struct ib_device *device = r_xprt->rx_ep->re_id->device;
	if (rb->rg_direction == DMA_NONE)
		return false;

	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
					    rdmab_length(rb), rb->rg_direction);
	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
	rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
			    rb->rg_direction);
	rb->rg_device = NULL;
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
	rpcrdma_regbuf_dma_unmap(rb);
	if (rb)
		kfree(rb->rg_data);
 * rpcrdma_post_sends - Post WRs to a transport's Send Queue
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req containing the Send WR to post
 * Returns 0 if the post was successful, otherwise -ENOTCONN
 * is returned.
int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	struct ib_send_wr *send_wr = &req->rl_wr;
	struct rpcrdma_ep *ep = r_xprt->rx_ep;
	if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
		send_wr->send_flags |= IB_SEND_SIGNALED;
		ep->re_send_count = ep->re_send_batch;
	} else {
		send_wr->send_flags &= ~IB_SEND_SIGNALED;
	trace_xprtrdma_post_send(req);
	rc = frwr_send(r_xprt, req);
/**
 * rpcrdma_post_recvs - Refill the Receive Queue
 * @r_xprt: controlling transport instance
 * @temp: mark Receive buffers to be deleted after use
 *
 */
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ep *ep = r_xprt->rx_ep;
	struct ib_recv_wr *wr, *bad_wr;
	struct rpcrdma_rep *rep;
	int needed, count, rc;
	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
	if (likely(ep->re_receive_count > needed))
	needed -= ep->re_receive_count;
	if (!temp)
		needed += RPCRDMA_MAX_RECV_BATCH;
	if (atomic_inc_return(&ep->re_receiving) > 1)
		goto out;

	/* fast path: all needed reps can be found on the free list */
	wr = NULL;
	while (needed) {
		rep = rpcrdma_rep_get_locked(buf);
		if (rep && rep->rr_temp) {
			rpcrdma_rep_destroy(rep);
			continue;
		}
			rep = rpcrdma_rep_create(r_xprt, temp);
		rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
		trace_xprtrdma_post_recv(rep);
		rep->rr_recv_wr.next = wr;
		wr = &rep->rr_recv_wr;
		--needed;
	rc = ib_post_recv(ep->re_id->qp, wr,
	if (atomic_dec_return(&ep->re_receiving) > 0)
		complete(&ep->re_done);

out:
	trace_xprtrdma_post_recvs(r_xprt, count, rc);
		for (wr = bad_wr; wr;) {
			struct rpcrdma_rep *rep;

			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
			rpcrdma_recv_buffer_put(rep);
			--count;
		}
	}
	ep->re_receive_count += count;