diff options
Diffstat (limited to 'io_uring/cancel.c')
| -rw-r--r-- | io_uring/cancel.c | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/io_uring/cancel.c b/io_uring/cancel.c index 64b51e82baa2..ca12ac10c0ae 100644 --- a/io_uring/cancel.c +++ b/io_uring/cancel.c @@ -14,6 +14,8 @@ #include "filetable.h" #include "io_uring.h" #include "tctx.h" +#include "sqpoll.h" +#include "uring_cmd.h" #include "poll.h" #include "timeout.h" #include "waitid.h" @@ -384,3 +386,271 @@ int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd, io_ring_submit_unlock(ctx, issue_flags); return nr ?: -ENOENT; } + +static bool io_match_linked(struct io_kiocb *head) +{ + struct io_kiocb *req; + + io_for_each_link(req, head) { + if (req->flags & REQ_F_INFLIGHT) + return true; + } + return false; +} + +/* + * As io_match_task() but protected against racing with linked timeouts. + * User must not hold timeout_lock. + */ +bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx, + bool cancel_all) +{ + bool matched; + + if (tctx && head->tctx != tctx) + return false; + if (cancel_all) + return true; + + if (head->flags & REQ_F_LINK_TIMEOUT) { + struct io_ring_ctx *ctx = head->ctx; + + /* protect against races with linked timeouts */ + raw_spin_lock_irq(&ctx->timeout_lock); + matched = io_match_linked(head); + raw_spin_unlock_irq(&ctx->timeout_lock); + } else { + matched = io_match_linked(head); + } + return matched; +} + +void __io_uring_cancel(bool cancel_all) +{ + io_uring_unreg_ringfd(); + io_uring_cancel_generic(cancel_all, NULL); +} + +struct io_task_cancel { + struct io_uring_task *tctx; + bool all; +}; + +static bool io_cancel_task_cb(struct io_wq_work *work, void *data) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + struct io_task_cancel *cancel = data; + + return io_match_task_safe(req, cancel->tctx, cancel->all); +} + +static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx, + struct io_uring_task *tctx, + bool cancel_all) +{ + struct io_defer_entry *de; + LIST_HEAD(list); + + list_for_each_entry_reverse(de, &ctx->defer_list, list) { + if (io_match_task_safe(de->req, tctx, cancel_all)) { + list_cut_position(&list, &ctx->defer_list, &de->list); + break; + } + } + if (list_empty(&list)) + return false; + + while (!list_empty(&list)) { + de = list_first_entry(&list, struct io_defer_entry, list); + list_del_init(&de->list); + ctx->nr_drained -= io_linked_nr(de->req); + io_req_task_queue_fail(de->req, -ECANCELED); + kfree(de); + } + return true; +} + +__cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + return req->ctx == data; +} + +static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx) +{ + struct io_tctx_node *node; + enum io_wq_cancel cret; + bool ret = false; + + mutex_lock(&ctx->uring_lock); + list_for_each_entry(node, &ctx->tctx_list, ctx_node) { + struct io_uring_task *tctx = node->task->io_uring; + + /* + * io_wq will stay alive while we hold uring_lock, because it's + * killed after ctx nodes, which requires to take the lock. + */ + if (!tctx || !tctx->io_wq) + continue; + cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true); + ret |= (cret != IO_WQ_CANCEL_NOTFOUND); + } + mutex_unlock(&ctx->uring_lock); + + return ret; +} + +__cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, + struct io_uring_task *tctx, + bool cancel_all, bool is_sqpoll_thread) +{ + struct io_task_cancel cancel = { .tctx = tctx, .all = cancel_all, }; + enum io_wq_cancel cret; + bool ret = false; + + /* set it so io_req_local_work_add() would wake us up */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 1); + smp_mb(); + } + + /* failed during ring init, it couldn't have issued any requests */ + if (!ctx->rings) + return false; + + if (!tctx) { + ret |= io_uring_try_cancel_iowq(ctx); + } else if (tctx->io_wq) { + /* + * Cancels requests of all rings, not only @ctx, but + * it's fine as the task is in exit/exec. + */ + cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb, + &cancel, true); + ret |= (cret != IO_WQ_CANCEL_NOTFOUND); + } + + /* SQPOLL thread does its own polling */ + if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) || + is_sqpoll_thread) { + while (!wq_list_empty(&ctx->iopoll_list)) { + io_iopoll_try_reap_events(ctx); + ret = true; + cond_resched(); + } + } + + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + io_allowed_defer_tw_run(ctx)) + ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0; + mutex_lock(&ctx->uring_lock); + ret |= io_cancel_defer_files(ctx, tctx, cancel_all); + ret |= io_poll_remove_all(ctx, tctx, cancel_all); + ret |= io_waitid_remove_all(ctx, tctx, cancel_all); + ret |= io_futex_remove_all(ctx, tctx, cancel_all); + ret |= io_uring_try_cancel_uring_cmd(ctx, tctx, cancel_all); + mutex_unlock(&ctx->uring_lock); + ret |= io_kill_timeouts(ctx, tctx, cancel_all); + if (tctx) + ret |= io_run_task_work() > 0; + else + ret |= flush_delayed_work(&ctx->fallback_work); + return ret; +} + +static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked) +{ + if (tracked) + return atomic_read(&tctx->inflight_tracked); + return percpu_counter_sum(&tctx->inflight); +} + +/* + * Find any io_uring ctx that this task has registered or done IO on, and cancel + * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation. + */ +__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) +{ + struct io_uring_task *tctx = current->io_uring; + struct io_ring_ctx *ctx; + struct io_tctx_node *node; + unsigned long index; + s64 inflight; + DEFINE_WAIT(wait); + + WARN_ON_ONCE(sqd && sqpoll_task_locked(sqd) != current); + + if (!current->io_uring) + return; + if (tctx->io_wq) + io_wq_exit_start(tctx->io_wq); + + atomic_inc(&tctx->in_cancel); + do { + bool loop = false; + + io_uring_drop_tctx_refs(current); + if (!tctx_inflight(tctx, !cancel_all)) + break; + + /* read completions before cancelations */ + inflight = tctx_inflight(tctx, false); + if (!inflight) + break; + + if (!sqd) { + xa_for_each(&tctx->xa, index, node) { + /* sqpoll task will cancel all its requests */ + if (node->ctx->sq_data) + continue; + loop |= io_uring_try_cancel_requests(node->ctx, + current->io_uring, + cancel_all, + false); + } + } else { + list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) + loop |= io_uring_try_cancel_requests(ctx, + current->io_uring, + cancel_all, + true); + } + + if (loop) { + cond_resched(); + continue; + } + + prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE); + io_run_task_work(); + io_uring_drop_tctx_refs(current); + xa_for_each(&tctx->xa, index, node) { + if (io_local_work_pending(node->ctx)) { + WARN_ON_ONCE(node->ctx->submitter_task && + node->ctx->submitter_task != current); + goto end_wait; + } + } + /* + * If we've seen completions, retry without waiting. This + * avoids a race where a completion comes in before we did + * prepare_to_wait(). + */ + if (inflight == tctx_inflight(tctx, !cancel_all)) + schedule(); +end_wait: + finish_wait(&tctx->wait, &wait); + } while (1); + + io_uring_clean_tctx(tctx); + if (cancel_all) { + /* + * We shouldn't run task_works after cancel, so just leave + * ->in_cancel set for normal exit. + */ + atomic_dec(&tctx->in_cancel); + /* for exec all current's requests should be gone, kill tctx */ + __io_uring_free(current); + } +} |
