Skip to content
verbs.c 38.6 KiB
Newer Older
	spin_lock(&buf->rb_recovery_lock);
	rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
	spin_unlock(&buf->rb_recovery_lock);

	schedule_delayed_work(&buf->rb_recovery_worker, 0);
}

static void
rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	unsigned int count;
	LIST_HEAD(free);
	LIST_HEAD(all);

	for (count = 0; count < 3; count++) {
		struct rpcrdma_mr *mr;
		mr = kzalloc(sizeof(*mr), GFP_KERNEL);
		if (!mr)
		rc = ia->ri_ops->ro_init_mr(ia, mr);
		if (rc) {
			kfree(mr);
		mr->mr_xprt = r_xprt;
		list_add(&mr->mr_list, &free);
		list_add(&mr->mr_all, &all);
	spin_lock(&buf->rb_mrlock);
	list_splice(&free, &buf->rb_mrs);
	list_splice(&all, &buf->rb_all);
	r_xprt->rx_stats.mrs_allocated += count;
	spin_unlock(&buf->rb_mrlock);
	trace_xprtrdma_createmrs(r_xprt, count);
}

static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_refresh_worker.work);
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);

	rpcrdma_mrs_create(r_xprt);
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
	struct rpcrdma_regbuf *rb;
	req = kzalloc(sizeof(*req), GFP_KERNEL);
	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
				  DMA_TO_DEVICE, GFP_KERNEL);
	if (IS_ERR(rb)) {
		kfree(req);
		return ERR_PTR(-ENOMEM);
	}
	req->rl_rdmabuf = rb;
	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
	req->rl_buffer = buffer;
	INIT_LIST_HEAD(&req->rl_registered);

	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
static int
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
					       DMA_FROM_DEVICE, GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
		     rdmab_length(rep->rr_rdmabuf));
	rep->rr_cqe.done = rpcrdma_wc_receive;
	rep->rr_rxprt = r_xprt;
	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	rep->rr_recv_wr.num_sge = 1;
	rep->rr_temp = temp;

	spin_lock(&buf->rb_lock);
	list_add(&rep->rr_list, &buf->rb_recv_bufs);
	spin_unlock(&buf->rb_lock);
	return 0;
	dprintk("RPC:       %s: reply buffer %d alloc failed\n",
		__func__, rc);
	return rc;
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
	buf->rb_bc_srv_max_requests = 0;
	spin_lock_init(&buf->rb_mrlock);
	spin_lock_init(&buf->rb_lock);
	spin_lock_init(&buf->rb_recovery_lock);
	INIT_LIST_HEAD(&buf->rb_mrs);
	INIT_LIST_HEAD(&buf->rb_all);
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
			  rpcrdma_mr_refresh_worker);
	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
			  rpcrdma_mr_recovery_worker);
	rpcrdma_mrs_create(r_xprt);
	INIT_LIST_HEAD(&buf->rb_send_bufs);
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
		list_add(&req->rl_list, &buf->rb_send_bufs);
	buf->rb_posted_receives = 0;
	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	rc = rpcrdma_sendctxs_create(r_xprt);
	if (rc)
		goto out;

	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
	rpcrdma_free_regbuf(rep->rr_rdmabuf);
rpcrdma_destroy_req(struct rpcrdma_req *req)
	rpcrdma_free_regbuf(req->rl_recvbuf);
	rpcrdma_free_regbuf(req->rl_sendbuf);
	rpcrdma_free_regbuf(req->rl_rdmabuf);
static void
rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
	struct rpcrdma_mr *mr;
	unsigned int count;

	count = 0;
	spin_lock(&buf->rb_mrlock);
	while (!list_empty(&buf->rb_all)) {
		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
		list_del(&mr->mr_all);
		spin_unlock(&buf->rb_mrlock);
		ia->ri_ops->ro_release_mr(mr);
		count++;
		spin_lock(&buf->rb_mrlock);
	spin_unlock(&buf->rb_mrlock);
	r_xprt->rx_stats.mrs_allocated = 0;

	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
}

void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	cancel_delayed_work_sync(&buf->rb_recovery_worker);
	cancel_delayed_work_sync(&buf->rb_refresh_worker);
	rpcrdma_sendctxs_destroy(buf);

	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
		rep = list_first_entry(&buf->rb_recv_bufs,
				       struct rpcrdma_rep, rr_list);
		list_del(&rep->rr_list);
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
		struct rpcrdma_req *req;
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
		spin_lock(&buf->rb_reqslock);
	spin_unlock(&buf->rb_reqslock);
	rpcrdma_mrs_destroy(buf);
/**
 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
 * @r_xprt: controlling transport
 *
 * Returns an initialized rpcrdma_mr or NULL if no free
 * rpcrdma_mr objects are available.
 */
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mr *mr = NULL;
	spin_lock(&buf->rb_mrlock);
	if (!list_empty(&buf->rb_mrs))
		mr = rpcrdma_mr_pop(&buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
	if (!mr)
		goto out_nomrs;
	return mr;
out_nomrs:
	if (r_xprt->rx_ep.rep_connected != -ENODEV)
		schedule_delayed_work(&buf->rb_refresh_worker, 0);

	/* Allow the reply handler and refresh worker to run */
	cond_resched();

	return NULL;
static void
__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
{
	spin_lock(&buf->rb_mrlock);
	rpcrdma_mr_push(mr, &buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
}

/**
 * rpcrdma_mr_put - Release an rpcrdma_mr object
 * @mr: object to release
 *
 */
rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}

/**
 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
 * @mr: object to release
 *
 */
void
rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
	trace_xprtrdma_dma_unmap(mr);
	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
			mr->mr_sg, mr->mr_nents, mr->mr_dir);
	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
/**
 * rpcrdma_buffer_get - Get a request buffer
 * @buffers: Buffer pool from which to obtain a buffer
 * Returns a fresh rpcrdma_req, or NULL if none are available.
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
	spin_lock(&buffers->rb_lock);
	req = list_first_entry_or_null(&buffers->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
	if (req)
		list_del_init(&req->rl_list);
	spin_unlock(&buffers->rb_lock);
/**
 * rpcrdma_buffer_put - Put request/reply buffers back into pool
 * @req: object to return
 *
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
	struct rpcrdma_rep *rep = req->rl_reply;
	req->rl_reply = NULL;

	spin_lock(&buffers->rb_lock);
	list_add(&req->rl_list, &buffers->rb_send_bufs);
		if (!rep->rr_temp) {
			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
			rep = NULL;
		}
	spin_unlock(&buffers->rb_lock);
	if (rep)
		rpcrdma_destroy_rep(rep);
}

/*
 * Put reply buffers back into pool when not attached to
 * request. This happens in error conditions.
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
	if (!rep->rr_temp) {
		spin_lock(&buffers->rb_lock);
		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
		spin_unlock(&buffers->rb_lock);
	} else {
		rpcrdma_destroy_rep(rep);
	}
 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
 * @size: size of buffer to be allocated, in bytes
 * @direction: direction of data movement
 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
 * can be persistently DMA-mapped for I/O.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via ro_map.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
		     gfp_t flags)
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		return ERR_PTR(-ENOMEM);
	rb->rg_device = NULL;
	rb->rg_direction = direction;
	rb->rg_iov.length = size;
/**
 * __rpcrdma_map_regbuf - DMA-map a regbuf
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be mapped
 */
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
	struct ib_device *device = ia->ri_device;

	if (rb->rg_direction == DMA_NONE)
		return false;

	rb->rg_iov.addr = ib_dma_map_single(device,
					    (void *)rb->rg_base,
					    rdmab_length(rb),
					    rb->rg_direction);
	if (ib_dma_mapping_error(device, rdmab_addr(rb)))
	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
	return true;
}

static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
			    rdmab_length(rb), rb->rg_direction);
	rb->rg_device = NULL;
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
	rpcrdma_dma_unmap_regbuf(rb);
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
	if (!ep->rep_send_count ||
	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
		send_wr->send_flags |= IB_SEND_SIGNALED;
		ep->rep_send_count = ep->rep_send_batch;
	} else {
		send_wr->send_flags &= ~IB_SEND_SIGNALED;
		--ep->rep_send_count;
	}
	rc = ia->ri_ops->ro_send(ia, req);
	trace_xprtrdma_post_send(req, rc);
/**
 * rpcrdma_post_recvs - Maybe post some Receive buffers
 * @r_xprt: controlling transport
 * @temp: when true, allocate temp rpcrdma_rep objects
 *
 */
void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct ib_recv_wr *wr, *bad_wr;
	int needed, count, rc;

	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
	if (buf->rb_posted_receives > needed)
		return;
	needed -= buf->rb_posted_receives;

	count = 0;
	wr = NULL;
	while (needed) {
		struct rpcrdma_regbuf *rb;
		struct rpcrdma_rep *rep;

		spin_lock(&buf->rb_lock);
		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
					       struct rpcrdma_rep, rr_list);
		if (likely(rep))
			list_del(&rep->rr_list);
		spin_unlock(&buf->rb_lock);
		if (!rep) {
			if (rpcrdma_create_rep(r_xprt, temp))
				break;
			continue;
		}

		rb = rep->rr_rdmabuf;
		if (!rpcrdma_regbuf_is_mapped(rb)) {
			if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
				rpcrdma_recv_buffer_put(rep);
				break;
			}
		}

		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
		rep->rr_recv_wr.next = wr;
		wr = &rep->rr_recv_wr;
		++count;
		--needed;
	}
	if (!count)
		return;

	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, &bad_wr);
	if (rc) {
		for (wr = bad_wr; wr; wr = wr->next) {
			struct rpcrdma_rep *rep;

			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
			rpcrdma_recv_buffer_put(rep);
			--count;
		}
	}
	buf->rb_posted_receives += count;
	trace_xprtrdma_post_recvs(r_xprt, count, rc);
}