diff options
Diffstat (limited to 'fs/aio.c')
| -rw-r--r-- | fs/aio.c | 726 | 
1 files changed, 466 insertions, 260 deletions
| @@ -26,6 +26,7 @@  #include <linux/mm.h>  #include <linux/mman.h>  #include <linux/mmu_context.h> +#include <linux/percpu.h>  #include <linux/slab.h>  #include <linux/timer.h>  #include <linux/aio.h> @@ -35,6 +36,10 @@  #include <linux/eventfd.h>  #include <linux/blkdev.h>  #include <linux/compat.h> +#include <linux/anon_inodes.h> +#include <linux/migrate.h> +#include <linux/ramfs.h> +#include <linux/percpu-refcount.h>  #include <asm/kmap_types.h>  #include <asm/uaccess.h> @@ -61,14 +66,29 @@ struct aio_ring {  #define AIO_RING_PAGES	8 +struct kioctx_table { +	struct rcu_head	rcu; +	unsigned	nr; +	struct kioctx	*table[]; +}; + +struct kioctx_cpu { +	unsigned		reqs_available; +}; +  struct kioctx { -	atomic_t		users; +	struct percpu_ref	users;  	atomic_t		dead; -	/* This needs improving */  	unsigned long		user_id; -	struct hlist_node	list; +	struct __percpu kioctx_cpu *cpu; + +	/* +	 * For percpu reqs_available, number of slots we move to/from global +	 * counter at a time: +	 */ +	unsigned		req_batch;  	/*  	 * This is what userspace passed to io_setup(), it's not used for  	 * anything but counting against the global max_reqs quota. @@ -88,10 +108,18 @@ struct kioctx {  	long			nr_pages;  	struct rcu_head		rcu_head; -	struct work_struct	rcu_work; +	struct work_struct	free_work;  	struct { -		atomic_t	reqs_active; +		/* +		 * This counts the number of available slots in the ringbuffer, +		 * so we avoid overflowing it: it's decremented (if positive) +		 * when allocating a kiocb and incremented when the resulting +		 * io_event is pulled off the ringbuffer. +		 * +		 * We batch accesses to it with a percpu version. +		 */ +		atomic_t	reqs_available;  	} ____cacheline_aligned_in_smp;  	struct { @@ -110,6 +138,9 @@ struct kioctx {  	} ____cacheline_aligned_in_smp;  	struct page		*internal_pages[AIO_RING_PAGES]; +	struct file		*aio_ring_file; + +	unsigned		id;  };  /*------ sysctl variables----*/ @@ -138,15 +169,77 @@ __initcall(aio_setup);  static void aio_free_ring(struct kioctx *ctx)  { -	long i; +	int i; +	struct file *aio_ring_file = ctx->aio_ring_file; -	for (i = 0; i < ctx->nr_pages; i++) +	for (i = 0; i < ctx->nr_pages; i++) { +		pr_debug("pid(%d) [%d] page->count=%d\n", current->pid, i, +				page_count(ctx->ring_pages[i]));  		put_page(ctx->ring_pages[i]); +	}  	if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)  		kfree(ctx->ring_pages); + +	if (aio_ring_file) { +		truncate_setsize(aio_ring_file->f_inode, 0); +		fput(aio_ring_file); +		ctx->aio_ring_file = NULL; +	} +} + +static int aio_ring_mmap(struct file *file, struct vm_area_struct *vma) +{ +	vma->vm_ops = &generic_file_vm_ops; +	return 0;  } +static const struct file_operations aio_ring_fops = { +	.mmap = aio_ring_mmap, +}; + +static int aio_set_page_dirty(struct page *page) +{ +	return 0; +} + +#if IS_ENABLED(CONFIG_MIGRATION) +static int aio_migratepage(struct address_space *mapping, struct page *new, +			struct page *old, enum migrate_mode mode) +{ +	struct kioctx *ctx = mapping->private_data; +	unsigned long flags; +	unsigned idx = old->index; +	int rc; + +	/* Writeback must be complete */ +	BUG_ON(PageWriteback(old)); +	put_page(old); + +	rc = migrate_page_move_mapping(mapping, new, old, NULL, mode); +	if (rc != MIGRATEPAGE_SUCCESS) { +		get_page(old); +		return rc; +	} + +	get_page(new); + +	spin_lock_irqsave(&ctx->completion_lock, flags); +	migrate_page_copy(new, old); +	ctx->ring_pages[idx] = new; +	spin_unlock_irqrestore(&ctx->completion_lock, flags); + +	return rc; +} +#endif + +static const struct address_space_operations aio_ctx_aops = { +	.set_page_dirty = aio_set_page_dirty, +#if IS_ENABLED(CONFIG_MIGRATION) +	.migratepage	= aio_migratepage, +#endif +}; +  static int aio_setup_ring(struct kioctx *ctx)  {  	struct aio_ring *ring; @@ -154,20 +247,45 @@ static int aio_setup_ring(struct kioctx *ctx)  	struct mm_struct *mm = current->mm;  	unsigned long size, populate;  	int nr_pages; +	int i; +	struct file *file;  	/* Compensate for the ring buffer's head/tail overlap entry */  	nr_events += 2;	/* 1 is required, 2 for good luck */  	size = sizeof(struct aio_ring);  	size += sizeof(struct io_event) * nr_events; -	nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT; +	nr_pages = PFN_UP(size);  	if (nr_pages < 0)  		return -EINVAL; -	nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event); +	file = anon_inode_getfile_private("[aio]", &aio_ring_fops, ctx, O_RDWR); +	if (IS_ERR(file)) { +		ctx->aio_ring_file = NULL; +		return -EAGAIN; +	} + +	file->f_inode->i_mapping->a_ops = &aio_ctx_aops; +	file->f_inode->i_mapping->private_data = ctx; +	file->f_inode->i_size = PAGE_SIZE * (loff_t)nr_pages; + +	for (i = 0; i < nr_pages; i++) { +		struct page *page; +		page = find_or_create_page(file->f_inode->i_mapping, +					   i, GFP_HIGHUSER | __GFP_ZERO); +		if (!page) +			break; +		pr_debug("pid(%d) page[%d]->count=%d\n", +			 current->pid, i, page_count(page)); +		SetPageUptodate(page); +		SetPageDirty(page); +		unlock_page(page); +	} +	ctx->aio_ring_file = file; +	nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) +			/ sizeof(struct io_event); -	ctx->nr_events = 0;  	ctx->ring_pages = ctx->internal_pages;  	if (nr_pages > AIO_RING_PAGES) {  		ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *), @@ -178,10 +296,11 @@ static int aio_setup_ring(struct kioctx *ctx)  	ctx->mmap_size = nr_pages * PAGE_SIZE;  	pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size); +  	down_write(&mm->mmap_sem); -	ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size, -				       PROT_READ|PROT_WRITE, -				       MAP_ANONYMOUS|MAP_PRIVATE, 0, &populate); +	ctx->mmap_base = do_mmap_pgoff(ctx->aio_ring_file, 0, ctx->mmap_size, +				       PROT_READ | PROT_WRITE, +				       MAP_SHARED | MAP_POPULATE, 0, &populate);  	if (IS_ERR((void *)ctx->mmap_base)) {  		up_write(&mm->mmap_sem);  		ctx->mmap_size = 0; @@ -190,23 +309,34 @@ static int aio_setup_ring(struct kioctx *ctx)  	}  	pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base); + +	/* We must do this while still holding mmap_sem for write, as we +	 * need to be protected against userspace attempting to mremap() +	 * or munmap() the ring buffer. +	 */  	ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,  				       1, 0, ctx->ring_pages, NULL); + +	/* Dropping the reference here is safe as the page cache will hold +	 * onto the pages for us.  It is also required so that page migration +	 * can unmap the pages and get the right reference count. +	 */ +	for (i = 0; i < ctx->nr_pages; i++) +		put_page(ctx->ring_pages[i]); +  	up_write(&mm->mmap_sem);  	if (unlikely(ctx->nr_pages != nr_pages)) {  		aio_free_ring(ctx);  		return -EAGAIN;  	} -	if (populate) -		mm_populate(ctx->mmap_base, populate);  	ctx->user_id = ctx->mmap_base;  	ctx->nr_events = nr_events; /* trusted copy */  	ring = kmap_atomic(ctx->ring_pages[0]);  	ring->nr = nr_events;	/* user copy */ -	ring->id = ctx->user_id; +	ring->id = ~0U;  	ring->head = ring->tail = 0;  	ring->magic = AIO_RING_MAGIC;  	ring->compat_features = AIO_RING_COMPAT_FEATURES; @@ -238,11 +368,9 @@ void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)  }  EXPORT_SYMBOL(kiocb_set_cancel_fn); -static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, -			struct io_event *res) +static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb)  {  	kiocb_cancel_fn *old, *cancel; -	int ret = -EINVAL;  	/*  	 * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it @@ -252,28 +380,20 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,  	cancel = ACCESS_ONCE(kiocb->ki_cancel);  	do {  		if (!cancel || cancel == KIOCB_CANCELLED) -			return ret; +			return -EINVAL;  		old = cancel;  		cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);  	} while (cancel != old); -	atomic_inc(&kiocb->ki_users); -	spin_unlock_irq(&ctx->ctx_lock); - -	memset(res, 0, sizeof(*res)); -	res->obj = (u64)(unsigned long)kiocb->ki_obj.user; -	res->data = kiocb->ki_user_data; -	ret = cancel(kiocb, res); - -	spin_lock_irq(&ctx->ctx_lock); - -	return ret; +	return cancel(kiocb);  }  static void free_ioctx_rcu(struct rcu_head *head)  {  	struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + +	free_percpu(ctx->cpu);  	kmem_cache_free(kioctx_cachep, ctx);  } @@ -282,12 +402,13 @@ static void free_ioctx_rcu(struct rcu_head *head)   * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -   * now it's safe to cancel any that need to be.   */ -static void free_ioctx(struct kioctx *ctx) +static void free_ioctx(struct work_struct *work)  { +	struct kioctx *ctx = container_of(work, struct kioctx, free_work);  	struct aio_ring *ring; -	struct io_event res;  	struct kiocb *req; -	unsigned head, avail; +	unsigned cpu, avail; +	DEFINE_WAIT(wait);  	spin_lock_irq(&ctx->ctx_lock); @@ -296,28 +417,38 @@ static void free_ioctx(struct kioctx *ctx)  				       struct kiocb, ki_list);  		list_del_init(&req->ki_list); -		kiocb_cancel(ctx, req, &res); +		kiocb_cancel(ctx, req);  	}  	spin_unlock_irq(&ctx->ctx_lock); -	ring = kmap_atomic(ctx->ring_pages[0]); -	head = ring->head; -	kunmap_atomic(ring); +	for_each_possible_cpu(cpu) { +		struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); -	while (atomic_read(&ctx->reqs_active) > 0) { -		wait_event(ctx->wait, -				head != ctx->tail || -				atomic_read(&ctx->reqs_active) <= 0); +		atomic_add(kcpu->reqs_available, &ctx->reqs_available); +		kcpu->reqs_available = 0; +	} -		avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head; +	while (1) { +		prepare_to_wait(&ctx->wait, &wait, TASK_UNINTERRUPTIBLE); -		atomic_sub(avail, &ctx->reqs_active); -		head += avail; -		head %= ctx->nr_events; +		ring = kmap_atomic(ctx->ring_pages[0]); +		avail = (ring->head <= ring->tail) +			 ? ring->tail - ring->head +			 : ctx->nr_events - ring->head + ring->tail; + +		atomic_add(avail, &ctx->reqs_available); +		ring->head = ring->tail; +		kunmap_atomic(ring); + +		if (atomic_read(&ctx->reqs_available) >= ctx->nr_events - 1) +			break; + +		schedule();  	} +	finish_wait(&ctx->wait, &wait); -	WARN_ON(atomic_read(&ctx->reqs_active) < 0); +	WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr_events - 1);  	aio_free_ring(ctx); @@ -333,10 +464,68 @@ static void free_ioctx(struct kioctx *ctx)  	call_rcu(&ctx->rcu_head, free_ioctx_rcu);  } -static void put_ioctx(struct kioctx *ctx) +static void free_ioctx_ref(struct percpu_ref *ref)  { -	if (unlikely(atomic_dec_and_test(&ctx->users))) -		free_ioctx(ctx); +	struct kioctx *ctx = container_of(ref, struct kioctx, users); + +	INIT_WORK(&ctx->free_work, free_ioctx); +	schedule_work(&ctx->free_work); +} + +static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) +{ +	unsigned i, new_nr; +	struct kioctx_table *table, *old; +	struct aio_ring *ring; + +	spin_lock(&mm->ioctx_lock); +	rcu_read_lock(); +	table = rcu_dereference(mm->ioctx_table); + +	while (1) { +		if (table) +			for (i = 0; i < table->nr; i++) +				if (!table->table[i]) { +					ctx->id = i; +					table->table[i] = ctx; +					rcu_read_unlock(); +					spin_unlock(&mm->ioctx_lock); + +					ring = kmap_atomic(ctx->ring_pages[0]); +					ring->id = ctx->id; +					kunmap_atomic(ring); +					return 0; +				} + +		new_nr = (table ? table->nr : 1) * 4; + +		rcu_read_unlock(); +		spin_unlock(&mm->ioctx_lock); + +		table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) * +				new_nr, GFP_KERNEL); +		if (!table) +			return -ENOMEM; + +		table->nr = new_nr; + +		spin_lock(&mm->ioctx_lock); +		rcu_read_lock(); +		old = rcu_dereference(mm->ioctx_table); + +		if (!old) { +			rcu_assign_pointer(mm->ioctx_table, table); +		} else if (table->nr > old->nr) { +			memcpy(table->table, old->table, +			       old->nr * sizeof(struct kioctx *)); + +			rcu_assign_pointer(mm->ioctx_table, table); +			kfree_rcu(old, rcu); +		} else { +			kfree(table); +			table = old; +		} +	}  }  /* ioctx_alloc @@ -348,6 +537,18 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)  	struct kioctx *ctx;  	int err = -ENOMEM; +	/* +	 * We keep track of the number of available ringbuffer slots, to prevent +	 * overflow (reqs_available), and we also use percpu counters for this. +	 * +	 * So since up to half the slots might be on other cpu's percpu counters +	 * and unavailable, double nr_events so userspace sees what they +	 * expected: additionally, we move req_batch slots to/from percpu +	 * counters at a time, so make sure that isn't 0: +	 */ +	nr_events = max(nr_events, num_possible_cpus() * 4); +	nr_events *= 2; +  	/* Prevent overflows */  	if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||  	    (nr_events > (0x10000000U / sizeof(struct kiocb)))) { @@ -355,7 +556,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)  		return ERR_PTR(-EINVAL);  	} -	if (!nr_events || (unsigned long)nr_events > aio_max_nr) +	if (!nr_events || (unsigned long)nr_events > (aio_max_nr * 2UL))  		return ERR_PTR(-EAGAIN);  	ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL); @@ -364,8 +565,9 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)  	ctx->max_reqs = nr_events; -	atomic_set(&ctx->users, 2); -	atomic_set(&ctx->dead, 0); +	if (percpu_ref_init(&ctx->users, free_ioctx_ref)) +		goto out_freectx; +  	spin_lock_init(&ctx->ctx_lock);  	spin_lock_init(&ctx->completion_lock);  	mutex_init(&ctx->ring_lock); @@ -373,12 +575,21 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)  	INIT_LIST_HEAD(&ctx->active_reqs); +	ctx->cpu = alloc_percpu(struct kioctx_cpu); +	if (!ctx->cpu) +		goto out_freeref; +  	if (aio_setup_ring(ctx) < 0) -		goto out_freectx; +		goto out_freepcpu; + +	atomic_set(&ctx->reqs_available, ctx->nr_events - 1); +	ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); +	if (ctx->req_batch < 1) +		ctx->req_batch = 1;  	/* limit the number of system wide aios */  	spin_lock(&aio_nr_lock); -	if (aio_nr + nr_events > aio_max_nr || +	if (aio_nr + nr_events > (aio_max_nr * 2UL) ||  	    aio_nr + nr_events < aio_nr) {  		spin_unlock(&aio_nr_lock);  		goto out_cleanup; @@ -386,49 +597,54 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)  	aio_nr += ctx->max_reqs;  	spin_unlock(&aio_nr_lock); -	/* now link into global list. */ -	spin_lock(&mm->ioctx_lock); -	hlist_add_head_rcu(&ctx->list, &mm->ioctx_list); -	spin_unlock(&mm->ioctx_lock); +	percpu_ref_get(&ctx->users); /* io_setup() will drop this ref */ + +	err = ioctx_add_table(ctx, mm); +	if (err) +		goto out_cleanup_put;  	pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",  		 ctx, ctx->user_id, mm, ctx->nr_events);  	return ctx; +out_cleanup_put: +	percpu_ref_put(&ctx->users);  out_cleanup:  	err = -EAGAIN;  	aio_free_ring(ctx); +out_freepcpu: +	free_percpu(ctx->cpu); +out_freeref: +	free_percpu(ctx->users.pcpu_count);  out_freectx: +	if (ctx->aio_ring_file) +		fput(ctx->aio_ring_file);  	kmem_cache_free(kioctx_cachep, ctx);  	pr_debug("error allocating ioctx %d\n", err);  	return ERR_PTR(err);  } -static void kill_ioctx_work(struct work_struct *work) -{ -	struct kioctx *ctx = container_of(work, struct kioctx, rcu_work); - -	wake_up_all(&ctx->wait); -	put_ioctx(ctx); -} - -static void kill_ioctx_rcu(struct rcu_head *head) -{ -	struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); - -	INIT_WORK(&ctx->rcu_work, kill_ioctx_work); -	schedule_work(&ctx->rcu_work); -} -  /* kill_ioctx   *	Cancels all outstanding aio requests on an aio context.  Used   *	when the processes owning a context have all exited to encourage   *	the rapid destruction of the kioctx.   */ -static void kill_ioctx(struct kioctx *ctx) +static void kill_ioctx(struct mm_struct *mm, struct kioctx *ctx)  {  	if (!atomic_xchg(&ctx->dead, 1)) { -		hlist_del_rcu(&ctx->list); +		struct kioctx_table *table; + +		spin_lock(&mm->ioctx_lock); +		rcu_read_lock(); +		table = rcu_dereference(mm->ioctx_table); + +		WARN_ON(ctx != table->table[ctx->id]); +		table->table[ctx->id] = NULL; +		rcu_read_unlock(); +		spin_unlock(&mm->ioctx_lock); + +		/* percpu_ref_kill() will do the necessary call_rcu() */ +		wake_up_all(&ctx->wait);  		/*  		 * It'd be more correct to do this in free_ioctx(), after all @@ -445,24 +661,23 @@ static void kill_ioctx(struct kioctx *ctx)  		if (ctx->mmap_size)  			vm_munmap(ctx->mmap_base, ctx->mmap_size); -		/* Between hlist_del_rcu() and dropping the initial ref */ -		call_rcu(&ctx->rcu_head, kill_ioctx_rcu); +		percpu_ref_kill(&ctx->users);  	}  }  /* wait_on_sync_kiocb:   *	Waits on the given sync kiocb to complete.   */ -ssize_t wait_on_sync_kiocb(struct kiocb *iocb) +ssize_t wait_on_sync_kiocb(struct kiocb *req)  { -	while (atomic_read(&iocb->ki_users)) { +	while (!req->ki_ctx) {  		set_current_state(TASK_UNINTERRUPTIBLE); -		if (!atomic_read(&iocb->ki_users)) +		if (req->ki_ctx)  			break;  		io_schedule();  	}  	__set_current_state(TASK_RUNNING); -	return iocb->ki_user_data; +	return req->ki_user_data;  }  EXPORT_SYMBOL(wait_on_sync_kiocb); @@ -476,16 +691,28 @@ EXPORT_SYMBOL(wait_on_sync_kiocb);   */  void exit_aio(struct mm_struct *mm)  { +	struct kioctx_table *table;  	struct kioctx *ctx; -	struct hlist_node *n; - -	hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) { -		if (1 != atomic_read(&ctx->users)) -			printk(KERN_DEBUG -				"exit_aio:ioctx still alive: %d %d %d\n", -				atomic_read(&ctx->users), -				atomic_read(&ctx->dead), -				atomic_read(&ctx->reqs_active)); +	unsigned i = 0; + +	while (1) { +		rcu_read_lock(); +		table = rcu_dereference(mm->ioctx_table); + +		do { +			if (!table || i >= table->nr) { +				rcu_read_unlock(); +				rcu_assign_pointer(mm->ioctx_table, NULL); +				if (table) +					kfree(table); +				return; +			} + +			ctx = table->table[i++]; +		} while (!ctx); + +		rcu_read_unlock(); +  		/*  		 * We don't need to bother with munmap() here -  		 * exit_mmap(mm) is coming and it'll unmap everything. @@ -496,40 +723,75 @@ void exit_aio(struct mm_struct *mm)  		 */  		ctx->mmap_size = 0; -		kill_ioctx(ctx); +		kill_ioctx(mm, ctx); +	} +} + +static void put_reqs_available(struct kioctx *ctx, unsigned nr) +{ +	struct kioctx_cpu *kcpu; + +	preempt_disable(); +	kcpu = this_cpu_ptr(ctx->cpu); + +	kcpu->reqs_available += nr; +	while (kcpu->reqs_available >= ctx->req_batch * 2) { +		kcpu->reqs_available -= ctx->req_batch; +		atomic_add(ctx->req_batch, &ctx->reqs_available); +	} + +	preempt_enable(); +} + +static bool get_reqs_available(struct kioctx *ctx) +{ +	struct kioctx_cpu *kcpu; +	bool ret = false; + +	preempt_disable(); +	kcpu = this_cpu_ptr(ctx->cpu); + +	if (!kcpu->reqs_available) { +		int old, avail = atomic_read(&ctx->reqs_available); + +		do { +			if (avail < ctx->req_batch) +				goto out; + +			old = avail; +			avail = atomic_cmpxchg(&ctx->reqs_available, +					       avail, avail - ctx->req_batch); +		} while (avail != old); + +		kcpu->reqs_available += ctx->req_batch;  	} + +	ret = true; +	kcpu->reqs_available--; +out: +	preempt_enable(); +	return ret;  }  /* aio_get_req - *	Allocate a slot for an aio request.  Increments the ki_users count - * of the kioctx so that the kioctx stays around until all requests are - * complete.  Returns NULL if no requests are free. - * - * Returns with kiocb->ki_users set to 2.  The io submit code path holds - * an extra reference while submitting the i/o. - * This prevents races between the aio code path referencing the - * req (after submitting it) and aio_complete() freeing the req. + *	Allocate a slot for an aio request. + * Returns NULL if no requests are free.   */  static inline struct kiocb *aio_get_req(struct kioctx *ctx)  {  	struct kiocb *req; -	if (atomic_read(&ctx->reqs_active) >= ctx->nr_events) +	if (!get_reqs_available(ctx))  		return NULL; -	if (atomic_inc_return(&ctx->reqs_active) > ctx->nr_events - 1) -		goto out_put; -  	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);  	if (unlikely(!req))  		goto out_put; -	atomic_set(&req->ki_users, 2);  	req->ki_ctx = ctx; -  	return req;  out_put: -	atomic_dec(&ctx->reqs_active); +	put_reqs_available(ctx, 1);  	return NULL;  } @@ -539,35 +801,32 @@ static void kiocb_free(struct kiocb *req)  		fput(req->ki_filp);  	if (req->ki_eventfd != NULL)  		eventfd_ctx_put(req->ki_eventfd); -	if (req->ki_dtor) -		req->ki_dtor(req); -	if (req->ki_iovec != &req->ki_inline_vec) -		kfree(req->ki_iovec);  	kmem_cache_free(kiocb_cachep, req);  } -void aio_put_req(struct kiocb *req) -{ -	if (atomic_dec_and_test(&req->ki_users)) -		kiocb_free(req); -} -EXPORT_SYMBOL(aio_put_req); -  static struct kioctx *lookup_ioctx(unsigned long ctx_id)  { +	struct aio_ring __user *ring  = (void __user *)ctx_id;  	struct mm_struct *mm = current->mm;  	struct kioctx *ctx, *ret = NULL; +	struct kioctx_table *table; +	unsigned id; + +	if (get_user(id, &ring->id)) +		return NULL;  	rcu_read_lock(); +	table = rcu_dereference(mm->ioctx_table); -	hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) { -		if (ctx->user_id == ctx_id) { -			atomic_inc(&ctx->users); -			ret = ctx; -			break; -		} -	} +	if (!table || id >= table->nr) +		goto out; +	ctx = table->table[id]; +	if (ctx && ctx->user_id == ctx_id) { +		percpu_ref_get(&ctx->users); +		ret = ctx; +	} +out:  	rcu_read_unlock();  	return ret;  } @@ -591,16 +850,16 @@ void aio_complete(struct kiocb *iocb, long res, long res2)  	 *  - the sync task helpfully left a reference to itself in the iocb  	 */  	if (is_sync_kiocb(iocb)) { -		BUG_ON(atomic_read(&iocb->ki_users) != 1);  		iocb->ki_user_data = res; -		atomic_set(&iocb->ki_users, 0); +		smp_wmb(); +		iocb->ki_ctx = ERR_PTR(-EXDEV);  		wake_up_process(iocb->ki_obj.tsk);  		return;  	}  	/*  	 * Take rcu_read_lock() in case the kioctx is being destroyed, as we -	 * need to issue a wakeup after decrementing reqs_active. +	 * need to issue a wakeup after incrementing reqs_available.  	 */  	rcu_read_lock(); @@ -613,17 +872,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)  	}  	/* -	 * cancelled requests don't get events, userland was given one -	 * when the event got cancelled. -	 */ -	if (unlikely(xchg(&iocb->ki_cancel, -			  KIOCB_CANCELLED) == KIOCB_CANCELLED)) { -		atomic_dec(&ctx->reqs_active); -		/* Still need the wake_up in case free_ioctx is waiting */ -		goto put_rq; -	} - -	/*  	 * Add a completion event to the ring buffer. Must be done holding  	 * ctx->completion_lock to prevent other code from messing with the tail  	 * pointer since we might be called from irq context. @@ -675,9 +923,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)  	if (iocb->ki_eventfd != NULL)  		eventfd_signal(iocb->ki_eventfd, 1); -put_rq:  	/* everything turned out well, dispose of the aiocb. */ -	aio_put_req(iocb); +	kiocb_free(iocb);  	/*  	 * We have to order our ring_info tail store above and test @@ -702,7 +949,7 @@ static long aio_read_events_ring(struct kioctx *ctx,  				 struct io_event __user *event, long nr)  {  	struct aio_ring *ring; -	unsigned head, pos; +	unsigned head, tail, pos;  	long ret = 0;  	int copy_ret; @@ -710,11 +957,12 @@ static long aio_read_events_ring(struct kioctx *ctx,  	ring = kmap_atomic(ctx->ring_pages[0]);  	head = ring->head; +	tail = ring->tail;  	kunmap_atomic(ring); -	pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr_events); +	pr_debug("h%u t%u m%u\n", head, tail, ctx->nr_events); -	if (head == ctx->tail) +	if (head == tail)  		goto out;  	while (ret < nr) { @@ -722,8 +970,8 @@ static long aio_read_events_ring(struct kioctx *ctx,  		struct io_event *ev;  		struct page *page; -		avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head; -		if (head == ctx->tail) +		avail = (head <= tail ?  tail : ctx->nr_events) - head; +		if (head == tail)  			break;  		avail = min(avail, nr - ret); @@ -754,9 +1002,9 @@ static long aio_read_events_ring(struct kioctx *ctx,  	kunmap_atomic(ring);  	flush_dcache_page(ctx->ring_pages[0]); -	pr_debug("%li  h%u t%u\n", ret, head, ctx->tail); +	pr_debug("%li  h%u t%u\n", ret, head, tail); -	atomic_sub(ret, &ctx->reqs_active); +	put_reqs_available(ctx, ret);  out:  	mutex_unlock(&ctx->ring_lock); @@ -854,8 +1102,8 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)  	if (!IS_ERR(ioctx)) {  		ret = put_user(ioctx->user_id, ctxp);  		if (ret) -			kill_ioctx(ioctx); -		put_ioctx(ioctx); +			kill_ioctx(current->mm, ioctx); +		percpu_ref_put(&ioctx->users);  	}  out: @@ -872,101 +1120,37 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)  {  	struct kioctx *ioctx = lookup_ioctx(ctx);  	if (likely(NULL != ioctx)) { -		kill_ioctx(ioctx); -		put_ioctx(ioctx); +		kill_ioctx(current->mm, ioctx); +		percpu_ref_put(&ioctx->users);  		return 0;  	}  	pr_debug("EINVAL: io_destroy: invalid context id\n");  	return -EINVAL;  } -static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret) -{ -	struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg]; - -	BUG_ON(ret <= 0); - -	while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) { -		ssize_t this = min((ssize_t)iov->iov_len, ret); -		iov->iov_base += this; -		iov->iov_len -= this; -		iocb->ki_left -= this; -		ret -= this; -		if (iov->iov_len == 0) { -			iocb->ki_cur_seg++; -			iov++; -		} -	} - -	/* the caller should not have done more io than what fit in -	 * the remaining iovecs */ -	BUG_ON(ret > 0 && iocb->ki_left == 0); -} -  typedef ssize_t (aio_rw_op)(struct kiocb *, const struct iovec *,  			    unsigned long, loff_t); -static ssize_t aio_rw_vect_retry(struct kiocb *iocb, int rw, aio_rw_op *rw_op) -{ -	struct file *file = iocb->ki_filp; -	struct address_space *mapping = file->f_mapping; -	struct inode *inode = mapping->host; -	ssize_t ret = 0; - -	/* This matches the pread()/pwrite() logic */ -	if (iocb->ki_pos < 0) -		return -EINVAL; - -	if (rw == WRITE) -		file_start_write(file); -	do { -		ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg], -			    iocb->ki_nr_segs - iocb->ki_cur_seg, -			    iocb->ki_pos); -		if (ret > 0) -			aio_advance_iovec(iocb, ret); - -	/* retry all partial writes.  retry partial reads as long as its a -	 * regular file. */ -	} while (ret > 0 && iocb->ki_left > 0 && -		 (rw == WRITE || -		  (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode)))); -	if (rw == WRITE) -		file_end_write(file); - -	/* This means we must have transferred all that we could */ -	/* No need to retry anymore */ -	if ((ret == 0) || (iocb->ki_left == 0)) -		ret = iocb->ki_nbytes - iocb->ki_left; - -	/* If we managed to write some out we return that, rather than -	 * the eventual error. */ -	if (rw == WRITE -	    && ret < 0 && ret != -EIOCBQUEUED -	    && iocb->ki_nbytes - iocb->ki_left) -		ret = iocb->ki_nbytes - iocb->ki_left; - -	return ret; -} - -static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat) +static ssize_t aio_setup_vectored_rw(struct kiocb *kiocb, +				     int rw, char __user *buf, +				     unsigned long *nr_segs, +				     struct iovec **iovec, +				     bool compat)  {  	ssize_t ret; -	kiocb->ki_nr_segs = kiocb->ki_nbytes; +	*nr_segs = kiocb->ki_nbytes;  #ifdef CONFIG_COMPAT  	if (compat)  		ret = compat_rw_copy_check_uvector(rw, -				(struct compat_iovec __user *)kiocb->ki_buf, -				kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec, -				&kiocb->ki_iovec); +				(struct compat_iovec __user *)buf, +				*nr_segs, 1, *iovec, iovec);  	else  #endif  		ret = rw_copy_check_uvector(rw, -				(struct iovec __user *)kiocb->ki_buf, -				kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec, -				&kiocb->ki_iovec); +				(struct iovec __user *)buf, +				*nr_segs, 1, *iovec, iovec);  	if (ret < 0)  		return ret; @@ -975,15 +1159,17 @@ static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat)  	return 0;  } -static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb) +static ssize_t aio_setup_single_vector(struct kiocb *kiocb, +				       int rw, char __user *buf, +				       unsigned long *nr_segs, +				       struct iovec *iovec)  { -	if (unlikely(!access_ok(!rw, kiocb->ki_buf, kiocb->ki_nbytes))) +	if (unlikely(!access_ok(!rw, buf, kiocb->ki_nbytes)))  		return -EFAULT; -	kiocb->ki_iovec = &kiocb->ki_inline_vec; -	kiocb->ki_iovec->iov_base = kiocb->ki_buf; -	kiocb->ki_iovec->iov_len = kiocb->ki_nbytes; -	kiocb->ki_nr_segs = 1; +	iovec->iov_base = buf; +	iovec->iov_len = kiocb->ki_nbytes; +	*nr_segs = 1;  	return 0;  } @@ -992,15 +1178,18 @@ static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)   *	Performs the initial checks and aio retry method   *	setup for the kiocb at the time of io submission.   */ -static ssize_t aio_run_iocb(struct kiocb *req, bool compat) +static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode, +			    char __user *buf, bool compat)  {  	struct file *file = req->ki_filp;  	ssize_t ret; +	unsigned long nr_segs;  	int rw;  	fmode_t mode;  	aio_rw_op *rw_op; +	struct iovec inline_vec, *iovec = &inline_vec; -	switch (req->ki_opcode) { +	switch (opcode) {  	case IOCB_CMD_PREAD:  	case IOCB_CMD_PREADV:  		mode	= FMODE_READ; @@ -1021,21 +1210,38 @@ rw_common:  		if (!rw_op)  			return -EINVAL; -		ret = (req->ki_opcode == IOCB_CMD_PREADV || -		       req->ki_opcode == IOCB_CMD_PWRITEV) -			? aio_setup_vectored_rw(rw, req, compat) -			: aio_setup_single_vector(rw, req); +		ret = (opcode == IOCB_CMD_PREADV || +		       opcode == IOCB_CMD_PWRITEV) +			? aio_setup_vectored_rw(req, rw, buf, &nr_segs, +						&iovec, compat) +			: aio_setup_single_vector(req, rw, buf, &nr_segs, +						  iovec);  		if (ret)  			return ret;  		ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes); -		if (ret < 0) +		if (ret < 0) { +			if (iovec != &inline_vec) +				kfree(iovec);  			return ret; +		}  		req->ki_nbytes = ret; -		req->ki_left = ret; -		ret = aio_rw_vect_retry(req, rw, rw_op); +		/* XXX: move/kill - rw_verify_area()? */ +		/* This matches the pread()/pwrite() logic */ +		if (req->ki_pos < 0) { +			ret = -EINVAL; +			break; +		} + +		if (rw == WRITE) +			file_start_write(file); + +		ret = rw_op(req, iovec, nr_segs, req->ki_pos); + +		if (rw == WRITE) +			file_end_write(file);  		break;  	case IOCB_CMD_FDSYNC: @@ -1057,6 +1263,9 @@ rw_common:  		return -EINVAL;  	} +	if (iovec != &inline_vec) +		kfree(iovec); +  	if (ret != -EIOCBQUEUED) {  		/*  		 * There's no easy way to restart the syscall since other AIO's @@ -1128,21 +1337,18 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,  	req->ki_obj.user = user_iocb;  	req->ki_user_data = iocb->aio_data;  	req->ki_pos = iocb->aio_offset; +	req->ki_nbytes = iocb->aio_nbytes; -	req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; -	req->ki_left = req->ki_nbytes = iocb->aio_nbytes; -	req->ki_opcode = iocb->aio_lio_opcode; - -	ret = aio_run_iocb(req, compat); +	ret = aio_run_iocb(req, iocb->aio_lio_opcode, +			   (char __user *)(unsigned long)iocb->aio_buf, +			   compat);  	if (ret)  		goto out_put_req; -	aio_put_req(req);	/* drop extra ref to req */  	return 0;  out_put_req: -	atomic_dec(&ctx->reqs_active); -	aio_put_req(req);	/* drop extra ref to req */ -	aio_put_req(req);	/* drop i/o ref to req */ +	put_reqs_available(ctx, 1); +	kiocb_free(req);  	return ret;  } @@ -1195,7 +1401,7 @@ long do_io_submit(aio_context_t ctx_id, long nr,  	}  	blk_finish_plug(&plug); -	put_ioctx(ctx); +	percpu_ref_put(&ctx->users);  	return i ? i : ret;  } @@ -1252,7 +1458,6 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,  SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,  		struct io_event __user *, result)  { -	struct io_event res;  	struct kioctx *ctx;  	struct kiocb *kiocb;  	u32 key; @@ -1270,21 +1475,22 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,  	kiocb = lookup_kiocb(ctx, iocb, key);  	if (kiocb) -		ret = kiocb_cancel(ctx, kiocb, &res); +		ret = kiocb_cancel(ctx, kiocb);  	else  		ret = -EINVAL;  	spin_unlock_irq(&ctx->ctx_lock);  	if (!ret) { -		/* Cancellation succeeded -- copy the result -		 * into the user's buffer. +		/* +		 * The result argument is no longer used - the io_event is +		 * always delivered via the ring buffer. -EINPROGRESS indicates +		 * cancellation is progress:  		 */ -		if (copy_to_user(result, &res, sizeof(res))) -			ret = -EFAULT; +		ret = -EINPROGRESS;  	} -	put_ioctx(ctx); +	percpu_ref_put(&ctx->users);  	return ret;  } @@ -1313,7 +1519,7 @@ SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,  	if (likely(ioctx)) {  		if (likely(min_nr <= nr && min_nr >= 0))  			ret = read_events(ioctx, min_nr, nr, events, timeout); -		put_ioctx(ioctx); +		percpu_ref_put(&ioctx->users);  	}  	return ret;  } | 
