Commit 895aa7b1 authored by Jens Axboe's avatar Jens Axboe
Browse files

Merge branch 'async-buffered.8' into for-5.9/io_uring

Pull in async buffered reads branch.

* async-buffered.8:
  io_uring: support true async buffered reads, if file provides it
  mm: add kiocb_wait_page_queue_init() helper
  btrfs: flag files as supporting buffered async reads
  xfs: flag files as supporting buffered async reads
  block: flag block devices as supporting IOCB_WAITQ
  fs: add FMODE_BUF_RASYNC
  mm: support async buffered reads in generic_file_buffered_read()
  mm: add support for async page locking
  mm: abstract out wake_page_match() from wake_page_function()
  mm: allow read-ahead with IOCB_NOWAIT set
  io_uring: re-issue block requests that failed because of resources
  io_uring: catch -EIO from buffered issue request failure
  io_uring: always plug for any number of IOs
  block: provide plug based way of signaling forced no-wait semantics
parents 2e0464d4 bcf5a063
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -958,6 +958,7 @@ generic_make_request_checks(struct bio *bio)
	struct request_queue *q;
	int nr_sectors = bio_sectors(bio);
	blk_status_t status = BLK_STS_IOERR;
	struct blk_plug *plug;
	char b[BDEVNAME_SIZE];

	might_sleep();
@@ -971,6 +972,10 @@ generic_make_request_checks(struct bio *bio)
		goto end_io;
	}

	plug = blk_mq_plug(q, bio);
	if (plug && plug->nowait)
		bio->bi_opf |= REQ_NOWAIT;

	/*
	 * For a REQ_NOWAIT based request, return -EOPNOTSUPP
	 * if queue is not a request based queue.
@@ -1800,6 +1805,7 @@ void blk_start_plug(struct blk_plug *plug)
	INIT_LIST_HEAD(&plug->cb_list);
	plug->rq_count = 0;
	plug->multiple_queues = false;
	plug->nowait = false;

	/*
	 * Store ordering should not be needed here, since a potential
+1 −1
Original line number Diff line number Diff line
@@ -1851,7 +1851,7 @@ static int blkdev_open(struct inode * inode, struct file * filp)
	 */
	filp->f_flags |= O_LARGEFILE;

	filp->f_mode |= FMODE_NOWAIT;
	filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;

	if (filp->f_flags & O_NDELAY)
		filp->f_mode |= FMODE_NDELAY;
+1 −1
Original line number Diff line number Diff line
@@ -3472,7 +3472,7 @@ static loff_t btrfs_file_llseek(struct file *file, loff_t offset, int whence)

static int btrfs_file_open(struct inode *inode, struct file *filp)
{
	filp->f_mode |= FMODE_NOWAIT;
	filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
	return generic_file_open(inode, filp);
}

+287 −43
Original line number Diff line number Diff line
@@ -78,6 +78,7 @@
#include <linux/fs_struct.h>
#include <linux/splice.h>
#include <linux/task_work.h>
#include <linux/pagemap.h>

#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
@@ -503,6 +504,8 @@ struct io_async_rw {
	struct iovec			*iov;
	ssize_t				nr_segs;
	ssize_t				size;
	struct wait_page_queue		wpq;
	struct callback_head		task_work;
};

struct io_async_ctx {
@@ -676,7 +679,6 @@ struct io_kiocb {
	};
};

#define IO_PLUG_THRESHOLD		2
#define IO_IOPOLL_BATCH			8

struct io_submit_state {
@@ -901,6 +903,13 @@ static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
static void __io_queue_sqe(struct io_kiocb *req,
			   const struct io_uring_sqe *sqe);

static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
			       struct iovec **iovec, struct iov_iter *iter,
			       bool needs_lock);
static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
			     struct iovec *iovec, struct iovec *fast_iov,
			     struct iov_iter *iter);

static struct kmem_cache *req_cachep;

static const struct file_operations io_uring_fops;
@@ -1979,13 +1988,116 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
	__io_cqring_add_event(req, res, cflags);
}

static void io_sq_thread_drop_mm(struct io_ring_ctx *ctx)
{
	struct mm_struct *mm = current->mm;

	if (mm) {
		kthread_unuse_mm(mm);
		mmput(mm);
	}
}

static int io_sq_thread_acquire_mm(struct io_ring_ctx *ctx,
				   struct io_kiocb *req)
{
	if (io_op_defs[req->opcode].needs_mm && !current->mm) {
		if (unlikely(!mmget_not_zero(ctx->sqo_mm)))
			return -EFAULT;
		kthread_use_mm(ctx->sqo_mm);
	}

	return 0;
}

#ifdef CONFIG_BLOCK
static bool io_resubmit_prep(struct io_kiocb *req, int error)
{
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
	ssize_t ret = -ECANCELED;
	struct iov_iter iter;
	int rw;

	if (error) {
		ret = error;
		goto end_req;
	}

	switch (req->opcode) {
	case IORING_OP_READV:
	case IORING_OP_READ_FIXED:
	case IORING_OP_READ:
		rw = READ;
		break;
	case IORING_OP_WRITEV:
	case IORING_OP_WRITE_FIXED:
	case IORING_OP_WRITE:
		rw = WRITE;
		break;
	default:
		printk_once(KERN_WARNING "io_uring: bad opcode in resubmit %d\n",
				req->opcode);
		goto end_req;
	}

	ret = io_import_iovec(rw, req, &iovec, &iter, false);
	if (ret < 0)
		goto end_req;
	ret = io_setup_async_rw(req, ret, iovec, inline_vecs, &iter);
	if (!ret)
		return true;
	kfree(iovec);
end_req:
	io_cqring_add_event(req, ret);
	req_set_fail_links(req);
	io_put_req(req);
	return false;
}

static void io_rw_resubmit(struct callback_head *cb)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
	struct io_ring_ctx *ctx = req->ctx;
	int err;

	__set_current_state(TASK_RUNNING);

	err = io_sq_thread_acquire_mm(ctx, req);

	if (io_resubmit_prep(req, err)) {
		refcount_inc(&req->refs);
		io_queue_async_work(req);
	}
}
#endif

static bool io_rw_reissue(struct io_kiocb *req, long res)
{
#ifdef CONFIG_BLOCK
	struct task_struct *tsk;
	int ret;

	if ((res != -EAGAIN && res != -EOPNOTSUPP) || io_wq_current_is_worker())
		return false;

	tsk = req->task;
	init_task_work(&req->task_work, io_rw_resubmit);
	ret = task_work_add(tsk, &req->task_work, true);
	if (!ret)
		return true;
#endif
	return false;
}

static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
{
	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);

	if (!io_rw_reissue(req, res)) {
		io_complete_rw_common(kiocb, res);
		io_put_req(req);
	}
}

static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
{
@@ -2089,6 +2201,15 @@ static struct file *__io_file_get(struct io_submit_state *state, int fd)
	return state->file;
}

static bool io_bdev_nowait(struct block_device *bdev)
{
#ifdef CONFIG_BLOCK
	return !bdev || queue_is_mq(bdev_get_queue(bdev));
#else
	return true;
#endif
}

/*
 * If we tracked the file through the SCM inflight mechanism, we could support
 * any file. For now, just ensure that anything potentially problematic is done
@@ -2098,10 +2219,19 @@ static bool io_file_supports_async(struct file *file, int rw)
{
	umode_t mode = file_inode(file)->i_mode;

	if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
	if (S_ISBLK(mode)) {
		if (io_bdev_nowait(file->f_inode->i_bdev))
			return true;
	if (S_ISREG(mode) && file->f_op != &io_uring_fops)
		return false;
	}
	if (S_ISCHR(mode) || S_ISSOCK(mode))
		return true;
	if (S_ISREG(mode)) {
		if (io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
		    file->f_op != &io_uring_fops)
			return true;
		return false;
	}

	/* any ->read/write should understand O_NONBLOCK */
	if (file->f_flags & O_NONBLOCK)
@@ -2152,6 +2282,9 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
	if (kiocb->ki_flags & IOCB_NOWAIT)
		req->flags |= REQ_F_NOWAIT;

	if (kiocb->ki_flags & IOCB_DIRECT)
		io_get_req_task(req);

	if (force_nonblock)
		kiocb->ki_flags |= IOCB_NOWAIT;

@@ -2620,6 +2753,126 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
	return 0;
}

static void __io_async_buf_error(struct io_kiocb *req, int error)
{
	struct io_ring_ctx *ctx = req->ctx;

	spin_lock_irq(&ctx->completion_lock);
	io_cqring_fill_event(req, error);
	io_commit_cqring(ctx);
	spin_unlock_irq(&ctx->completion_lock);

	io_cqring_ev_posted(ctx);
	req_set_fail_links(req);
	io_double_put_req(req);
}

static void io_async_buf_cancel(struct callback_head *cb)
{
	struct io_async_rw *rw;
	struct io_kiocb *req;

	rw = container_of(cb, struct io_async_rw, task_work);
	req = rw->wpq.wait.private;
	__io_async_buf_error(req, -ECANCELED);
}

static void io_async_buf_retry(struct callback_head *cb)
{
	struct io_async_rw *rw;
	struct io_ring_ctx *ctx;
	struct io_kiocb *req;

	rw = container_of(cb, struct io_async_rw, task_work);
	req = rw->wpq.wait.private;
	ctx = req->ctx;

	__set_current_state(TASK_RUNNING);
	if (!io_sq_thread_acquire_mm(ctx, req)) {
		mutex_lock(&ctx->uring_lock);
		__io_queue_sqe(req, NULL);
		mutex_unlock(&ctx->uring_lock);
	} else {
		__io_async_buf_error(req, -EFAULT);
	}
}

static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
			     int sync, void *arg)
{
	struct wait_page_queue *wpq;
	struct io_kiocb *req = wait->private;
	struct io_async_rw *rw = &req->io->rw;
	struct wait_page_key *key = arg;
	struct task_struct *tsk;
	int ret;

	wpq = container_of(wait, struct wait_page_queue, wait);

	ret = wake_page_match(wpq, key);
	if (ret != 1)
		return ret;

	list_del_init(&wait->entry);

	init_task_work(&rw->task_work, io_async_buf_retry);
	/* submit ref gets dropped, acquire a new one */
	refcount_inc(&req->refs);
	tsk = req->task;
	ret = task_work_add(tsk, &rw->task_work, true);
	if (unlikely(ret)) {
		/* queue just for cancelation */
		init_task_work(&rw->task_work, io_async_buf_cancel);
		tsk = io_wq_get_task(req->ctx->io_wq);
		task_work_add(tsk, &rw->task_work, true);
	}
	wake_up_process(tsk);
	return 1;
}

static bool io_rw_should_retry(struct io_kiocb *req)
{
	struct kiocb *kiocb = &req->rw.kiocb;
	int ret;

	/* never retry for NOWAIT, we just complete with -EAGAIN */
	if (req->flags & REQ_F_NOWAIT)
		return false;

	/* already tried, or we're doing O_DIRECT */
	if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
		return false;
	/*
	 * just use poll if we can, and don't attempt if the fs doesn't
	 * support callback based unlocks
	 */
	if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
		return false;

	/*
	 * If request type doesn't require req->io to defer in general,
	 * we need to allocate it here
	 */
	if (!req->io && __io_alloc_async_ctx(req))
		return false;

	ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
						io_async_buf_func, req);
	if (!ret) {
		io_get_req_task(req);
		return true;
	}

	return false;
}

static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
{
	if (req->file->f_op->read_iter)
		return call_read_iter(req->file, &req->rw.kiocb, iter);
	return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
}

static int io_read(struct io_kiocb *req, bool force_nonblock)
{
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
@@ -2651,17 +2904,17 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
	iov_count = iov_iter_count(&iter);
	ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
	if (!ret) {
		ssize_t ret2;
		unsigned long nr_segs = iter.nr_segs;
		ssize_t ret2 = 0;

		if (req->file->f_op->read_iter)
			ret2 = call_read_iter(req->file, kiocb, &iter);
		else
			ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
		ret2 = io_iter_do_read(req, &iter);

		/* Catch -EAGAIN return for forced non-blocking submission */
		if (!force_nonblock || ret2 != -EAGAIN) {
		if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
			kiocb_done(kiocb, ret2);
		} else {
			iter.count = iov_count;
			iter.nr_segs = nr_segs;
copy_iov:
			ret = io_setup_async_rw(req, io_size, iovec,
						inline_vecs, &iter);
@@ -2671,6 +2924,17 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
			if (!(req->flags & REQ_F_NOWAIT) &&
			    !file_can_poll(req->file))
				req->flags |= REQ_F_MUST_PUNT;
			/* if we can retry, do so with the callbacks armed */
			if (io_rw_should_retry(req)) {
				ret2 = io_iter_do_read(req, &iter);
				if (ret2 == -EIOCBQUEUED) {
					goto out_free;
				} else if (ret2 != -EAGAIN) {
					kiocb_done(kiocb, ret2);
					goto out_free;
				}
			}
			kiocb->ki_flags &= ~IOCB_WAITQ;
			return -EAGAIN;
		}
	}
@@ -2748,6 +3012,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock)
	iov_count = iov_iter_count(&iter);
	ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
	if (!ret) {
		unsigned long nr_segs = iter.nr_segs;
		ssize_t ret2;

		/*
@@ -2785,6 +3050,8 @@ static int io_write(struct io_kiocb *req, bool force_nonblock)
		if (!force_nonblock || ret2 != -EAGAIN) {
			kiocb_done(kiocb, ret2);
		} else {
			iter.count = iov_count;
			iter.nr_segs = nr_segs;
copy_iov:
			ret = io_setup_async_rw(req, io_size, iovec,
						inline_vecs, &iter);
@@ -4265,28 +4532,6 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
	__io_queue_proc(&pt->req->apoll->poll, pt, head);
}

static void io_sq_thread_drop_mm(struct io_ring_ctx *ctx)
{
	struct mm_struct *mm = current->mm;

	if (mm) {
		kthread_unuse_mm(mm);
		mmput(mm);
	}
}

static int io_sq_thread_acquire_mm(struct io_ring_ctx *ctx,
				   struct io_kiocb *req)
{
	if (io_op_defs[req->opcode].needs_mm && !current->mm) {
		if (unlikely(!mmget_not_zero(ctx->sqo_mm)))
			return -EFAULT;
		kthread_use_mm(ctx->sqo_mm);
	}

	return 0;
}

static void io_async_task_func(struct callback_head *cb)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
@@ -5797,6 +6042,9 @@ static void io_submit_state_start(struct io_submit_state *state,
				  unsigned int max_ios)
{
	blk_start_plug(&state->plug);
#ifdef CONFIG_BLOCK
	state->plug.nowait = true;
#endif
	state->free_reqs = 0;
	state->file = NULL;
	state->ios_left = max_ios;
@@ -5914,7 +6162,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
			  struct file *ring_file, int ring_fd)
{
	struct io_submit_state state, *statep = NULL;
	struct io_submit_state state;
	struct io_kiocb *link = NULL;
	int i, submitted = 0;

@@ -5931,10 +6179,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
	if (!percpu_ref_tryget_many(&ctx->refs, nr))
		return -EAGAIN;

	if (nr > IO_PLUG_THRESHOLD) {
	io_submit_state_start(&state, nr);
		statep = &state;
	}

	ctx->ring_fd = ring_fd;
	ctx->ring_file = ring_file;
@@ -5949,14 +6194,14 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
			io_consume_sqe(ctx);
			break;
		}
		req = io_alloc_req(ctx, statep);
		req = io_alloc_req(ctx, &state);
		if (unlikely(!req)) {
			if (!submitted)
				submitted = -EAGAIN;
			break;
		}

		err = io_init_req(ctx, req, sqe, statep);
		err = io_init_req(ctx, req, sqe, &state);
		io_consume_sqe(ctx);
		/* will complete beyond this point, count as submitted */
		submitted++;
@@ -5982,7 +6227,6 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
	}
	if (link)
		io_queue_link_head(link);
	if (statep)
	io_submit_state_end(&state);

	 /* Commit SQ ring head once we've consumed and submitted all SQEs */
+1 −1
Original line number Diff line number Diff line
@@ -1080,7 +1080,7 @@ xfs_file_open(
		return -EFBIG;
	if (XFS_FORCED_SHUTDOWN(XFS_M(inode->i_sb)))
		return -EIO;
	file->f_mode |= FMODE_NOWAIT;
	file->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
	return 0;
}

Loading