Skip to content
verbs.c 38.9 KiB
Newer Older
struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
				       gfp_t flags)
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
	struct rpcrdma_regbuf *rb;
	req = kzalloc(sizeof(*req), flags);
	rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
	if (!req->rl_sendbuf)
		goto out3;

	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
	if (!req->rl_recvbuf)
		goto out4;

	req->rl_buffer = buffer;
	INIT_LIST_HEAD(&req->rl_registered);
	spin_lock(&buffer->rb_lock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_lock);

out4:
	kfree(req->rl_sendbuf);
out3:
	kfree(req->rl_rdmabuf);
out2:
	kfree(req);
out1:
	return NULL;
static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
	rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
					       DMA_FROM_DEVICE, GFP_KERNEL);
	if (!rep->rr_rdmabuf)
	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
		     rdmab_length(rep->rr_rdmabuf));
	rep->rr_cqe.done = rpcrdma_wc_receive;
	rep->rr_rxprt = r_xprt;
	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	rep->rr_recv_wr.num_sge = 1;
	rep->rr_temp = temp;

	spin_lock(&buf->rb_lock);
	list_add(&rep->rr_list, &buf->rb_recv_bufs);
	spin_unlock(&buf->rb_lock);
/**
 * rpcrdma_buffer_create - Create initial set of req/rep objects
 * @r_xprt: transport instance to (re)initialize
 *
 * Returns zero on success, otherwise a negative errno.
 */
int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	buf->rb_flags = 0;
	buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
	buf->rb_bc_srv_max_requests = 0;
	spin_lock_init(&buf->rb_mrlock);
	spin_lock_init(&buf->rb_lock);
	INIT_LIST_HEAD(&buf->rb_mrs);
	INIT_LIST_HEAD(&buf->rb_all);
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
			  rpcrdma_mr_refresh_worker);
	rpcrdma_mrs_create(r_xprt);
	INIT_LIST_HEAD(&buf->rb_send_bufs);
	INIT_LIST_HEAD(&buf->rb_allreqs);
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

		req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
					 GFP_KERNEL);
		list_add(&req->rl_list, &buf->rb_send_bufs);
	buf->rb_credits = 1;
	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	rc = rpcrdma_sendctxs_create(r_xprt);
	if (rc)
		goto out;

	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
						WQ_MEM_RECLAIM | WQ_HIGHPRI,
						0,
			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
	if (!buf->rb_completion_wq) {
		rc = -ENOMEM;
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
	rpcrdma_regbuf_free(rep->rr_rdmabuf);
/**
 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
 * @req: unused object to be destroyed
 *
 * This function assumes that the caller prevents concurrent device
 * unload and transport tear-down.
 */
rpcrdma_req_destroy(struct rpcrdma_req *req)
	rpcrdma_regbuf_free(req->rl_recvbuf);
	rpcrdma_regbuf_free(req->rl_sendbuf);
	rpcrdma_regbuf_free(req->rl_rdmabuf);
static void
rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
	struct rpcrdma_mr *mr;
	unsigned int count;

	count = 0;
	spin_lock(&buf->rb_mrlock);
	while (!list_empty(&buf->rb_all)) {
		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
		list_del(&mr->mr_all);
		spin_unlock(&buf->rb_mrlock);

		/* Ensure MW is not on any rl_registered list */
		if (!list_empty(&mr->mr_list))
			list_del(&mr->mr_list);

		frwr_release_mr(mr);
		count++;
		spin_lock(&buf->rb_mrlock);
	spin_unlock(&buf->rb_mrlock);
	r_xprt->rx_stats.mrs_allocated = 0;

	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
}

/**
 * rpcrdma_buffer_destroy - Release all hw resources
 * @buf: root control block for resources
 *
 * ORDERING: relies on a prior ib_drain_qp :
 * - No more Send or Receive completions can occur
 * - All MRs, reps, and reqs are returned to their free lists
 */
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	cancel_delayed_work_sync(&buf->rb_refresh_worker);
	if (buf->rb_completion_wq) {
		destroy_workqueue(buf->rb_completion_wq);
		buf->rb_completion_wq = NULL;
	}

	rpcrdma_sendctxs_destroy(buf);

	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
		rep = list_first_entry(&buf->rb_recv_bufs,
				       struct rpcrdma_rep, rr_list);
		list_del(&rep->rr_list);
	while (!list_empty(&buf->rb_send_bufs)) {
		struct rpcrdma_req *req;
		req = list_first_entry(&buf->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
		list_del(&req->rl_list);
		rpcrdma_req_destroy(req);
	rpcrdma_mrs_destroy(buf);
/**
 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
 * @r_xprt: controlling transport
 *
 * Returns an initialized rpcrdma_mr or NULL if no free
 * rpcrdma_mr objects are available.
 */
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mr *mr = NULL;
	spin_lock(&buf->rb_mrlock);
	if (!list_empty(&buf->rb_mrs))
		mr = rpcrdma_mr_pop(&buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
	if (!mr)
		goto out_nomrs;
	return mr;
out_nomrs:
	if (r_xprt->rx_ep.rep_connected != -ENODEV)
		schedule_delayed_work(&buf->rb_refresh_worker, 0);

	/* Allow the reply handler and refresh worker to run */
	cond_resched();

	return NULL;
static void
__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
{
	spin_lock(&buf->rb_mrlock);
	rpcrdma_mr_push(mr, &buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
}

/**
 * rpcrdma_mr_put - Release an rpcrdma_mr object
 * @mr: object to release
 *
 */
rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}

/**
 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
 * @mr: object to release
 *
 */
void
rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
	if (mr->mr_dir != DMA_NONE) {
		trace_xprtrdma_mr_unmap(mr);
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
		mr->mr_dir = DMA_NONE;
	}
	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
/**
 * rpcrdma_buffer_get - Get a request buffer
 * @buffers: Buffer pool from which to obtain a buffer
 * Returns a fresh rpcrdma_req, or NULL if none are available.
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
	spin_lock(&buffers->rb_lock);
	req = list_first_entry_or_null(&buffers->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
	if (req)
		list_del_init(&req->rl_list);
	spin_unlock(&buffers->rb_lock);
/**
 * rpcrdma_buffer_put - Put request/reply buffers back into pool
 * @req: object to return
 *
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
	struct rpcrdma_rep *rep = req->rl_reply;
	req->rl_reply = NULL;

	spin_lock(&buffers->rb_lock);
	list_add(&req->rl_list, &buffers->rb_send_bufs);
		if (!rep->rr_temp) {
			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
			rep = NULL;
		}
	spin_unlock(&buffers->rb_lock);
}

/*
 * Put reply buffers back into pool when not attached to
 * request. This happens in error conditions.
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
	if (!rep->rr_temp) {
		spin_lock(&buffers->rb_lock);
		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
		spin_unlock(&buffers->rb_lock);
	} else {
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via frwr_map.
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb), flags);
	if (!rb)
		return NULL;
	rb->rg_data = kmalloc(size, flags);
	if (!rb->rg_data) {
		kfree(rb);
		return NULL;
	}
	rb->rg_device = NULL;
	rb->rg_direction = direction;
	rb->rg_iov.length = size;
/**
 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
 * @rb: regbuf to reallocate
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns true if reallocation was successful. If false is
 * returned, @rb is left untouched.
 */
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{
	void *buf;

	buf = kmalloc(size, flags);
	if (!buf)
		return false;

	rpcrdma_regbuf_dma_unmap(rb);
	kfree(rb->rg_data);

	rb->rg_data = buf;
	rb->rg_iov.length = size;
	return true;
}

 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
 * @r_xprt: controlling transport instance
 * @rb: regbuf to be mapped
 *
 * Returns true if the buffer is now DMA mapped to @r_xprt's device
bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
			      struct rpcrdma_regbuf *rb)
	struct ib_device *device = r_xprt->rx_ia.ri_id->device;
	if (rb->rg_direction == DMA_NONE)
		return false;

	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
					    rdmab_length(rb), rb->rg_direction);
	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
	rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
			    rb->rg_direction);
	rb->rg_device = NULL;
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
	rpcrdma_regbuf_dma_unmap(rb);
	if (rb)
		kfree(rb->rg_data);
/**
 * rpcrdma_ep_post - Post WRs to a transport's Send Queue
 * @ia: transport's device information
 * @ep: transport's RDMA endpoint information
 * @req: rpcrdma_req containing the Send WR to post
 * Returns 0 if the post was successful, otherwise -ENOTCONN
 * is returned.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
	if (!ep->rep_send_count ||
	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
		send_wr->send_flags |= IB_SEND_SIGNALED;
		ep->rep_send_count = ep->rep_send_batch;
	} else {
		send_wr->send_flags &= ~IB_SEND_SIGNALED;
		--ep->rep_send_count;
	}
	rc = frwr_send(ia, req);
	trace_xprtrdma_post_send(req, rc);
static void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct ib_recv_wr *wr, *bad_wr;
	int needed, count, rc;
	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
	if (ep->rep_receive_count > needed)
	needed -= ep->rep_receive_count;
	if (!temp)
		needed += RPCRDMA_MAX_RECV_BATCH;
	count = 0;
	wr = NULL;
	while (needed) {
		struct rpcrdma_regbuf *rb;
		struct rpcrdma_rep *rep;
		spin_lock(&buf->rb_lock);
		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
					       struct rpcrdma_rep, rr_list);
		if (likely(rep))
			list_del(&rep->rr_list);
		spin_unlock(&buf->rb_lock);
		if (!rep) {
			if (!rpcrdma_rep_create(r_xprt, temp))
		rb = rep->rr_rdmabuf;
		if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) {
			rpcrdma_recv_buffer_put(rep);
			break;
		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
		rep->rr_recv_wr.next = wr;
		wr = &rep->rr_recv_wr;
		++count;
		--needed;
	}
	if (!count)
	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
			  (const struct ib_recv_wr **)&bad_wr);
	if (rc) {
		for (wr = bad_wr; wr; wr = wr->next) {
			struct rpcrdma_rep *rep;

			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
			rpcrdma_recv_buffer_put(rep);
			--count;
		}
	}
	ep->rep_receive_count += count;
	trace_xprtrdma_post_recvs(r_xprt, count, rc);