summaryrefslogtreecommitdiff
path: root/src/backend/storage/aio/aio_callback.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/aio/aio_callback.c')
-rw-r--r--src/backend/storage/aio/aio_callback.c308
1 files changed, 308 insertions, 0 deletions
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
new file mode 100644
index 00000000000..d5a2cca28f1
--- /dev/null
+++ b/src/backend/storage/aio/aio_callback.c
@@ -0,0 +1,308 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_callback.c
+ * AIO - Functionality related to callbacks that can be registered on IO
+ * Handles
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/aio/aio_callback.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
+
+
+/* just to have something to put into aio_handle_cbs */
+static const PgAioHandleCallbacks aio_invalid_cb = {0};
+
+typedef struct PgAioHandleCallbacksEntry
+{
+ const PgAioHandleCallbacks *const cb;
+ const char *const name;
+} PgAioHandleCallbacksEntry;
+
+/*
+ * Callback definition for the callbacks that can be registered on an IO
+ * handle. See PgAioHandleCallbackID's definition for an explanation for why
+ * callbacks are not identified by a pointer.
+ */
+static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
+#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
+ CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
+#undef CALLBACK_ENTRY
+};
+
+
+
+/* --------------------------------------------------------------------------------
+ * Public callback related functions operating on IO Handles
+ * --------------------------------------------------------------------------------
+ */
+
+/*
+ * Register callback for the IO handle.
+ *
+ * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
+ * registered for each IO.
+ *
+ * Callbacks need to be registered before [indirectly] calling
+ * pgaio_io_prep_*(), as the IO may be executed immediately.
+ *
+ * A callback can be passed a small bit of data, e.g. to indicate whether to
+ * zero a buffer if it is invalid.
+ *
+ *
+ * Note that callbacks are executed in critical sections. This is necessary
+ * to be able to execute IO in critical sections (consider e.g. WAL
+ * logging). To perform AIO we first need to acquire a handle, which, if there
+ * are no free handles, requires waiting for IOs to complete and to execute
+ * their completion callbacks.
+ *
+ * Callbacks may be executed in the issuing backend but also in another
+ * backend (because that backend is waiting for the IO) or in IO workers (if
+ * io_method=worker is used).
+ *
+ *
+ * See PgAioHandleCallbackID's definition for an explanation for why
+ * callbacks are not identified by a pointer.
+ */
+void
+pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
+ uint8 cb_data)
+{
+ const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
+
+ if (cb_id >= lengthof(aio_handle_cbs))
+ elog(ERROR, "callback %d is out of range", cb_id);
+ if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
+ aio_handle_cbs[cb_id].cb->complete_local == NULL)
+ elog(ERROR, "callback %d does not have a completion callback", cb_id);
+ if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
+ elog(PANIC, "too many callbacks, the max is %d",
+ PGAIO_HANDLE_MAX_CALLBACKS);
+ ioh->callbacks[ioh->num_callbacks] = cb_id;
+ ioh->callbacks_data[ioh->num_callbacks] = cb_data;
+
+ pgaio_debug_io(DEBUG3, ioh,
+ "adding cb #%d, id %d/%s",
+ ioh->num_callbacks + 1,
+ cb_id, ce->name);
+
+ ioh->num_callbacks++;
+}
+
+/*
+ * Associate an array of data with the Handle. This is e.g. useful to the
+ * transport knowledge about which buffers a multi-block IO affects to
+ * completion callbacks.
+ *
+ * Right now this can be done only once for each IO, even though multiple
+ * callbacks can be registered. There aren't any known usecases requiring more
+ * and the required amount of shared memory does add up, so it doesn't seem
+ * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
+ */
+void
+pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
+{
+ Assert(ioh->state == PGAIO_HS_HANDED_OUT);
+ Assert(ioh->handle_data_len == 0);
+ Assert(len <= PG_IOV_MAX);
+
+ for (int i = 0; i < len; i++)
+ pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
+ ioh->handle_data_len = len;
+}
+
+/*
+ * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
+ * array to a 64bit array. Without it callers would end up needing to
+ * open-code equivalent code.
+ */
+void
+pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
+{
+ Assert(ioh->state == PGAIO_HS_HANDED_OUT);
+ Assert(ioh->handle_data_len == 0);
+ Assert(len <= PG_IOV_MAX);
+
+ for (int i = 0; i < len; i++)
+ pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
+ ioh->handle_data_len = len;
+}
+
+/*
+ * Return data set with pgaio_io_set_handle_data_*().
+ */
+uint64 *
+pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
+{
+ Assert(ioh->handle_data_len > 0);
+
+ *len = ioh->handle_data_len;
+
+ return &pgaio_ctl->handle_data[ioh->iovec_off];
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * Public IO Result related functions
+ * --------------------------------------------------------------------------------
+ */
+
+void
+pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
+{
+ PgAioHandleCallbackID cb_id = result.id;
+ const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
+
+ Assert(result.status != ARS_UNKNOWN);
+ Assert(result.status != ARS_OK);
+
+ if (ce->cb->report == NULL)
+ elog(ERROR, "callback %d/%s does not have report callback",
+ result.id, ce->name);
+
+ ce->cb->report(result, target_data, elevel);
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * Internal callback related functions operating on IO Handles
+ * --------------------------------------------------------------------------------
+ */
+
+/*
+ * Internal function which invokes ->stage for all the registered callbacks.
+ */
+void
+pgaio_io_call_stage(PgAioHandle *ioh)
+{
+ Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
+ Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
+
+ for (int i = ioh->num_callbacks; i > 0; i--)
+ {
+ PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
+ uint8 cb_data = ioh->callbacks_data[i - 1];
+ const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
+
+ if (!ce->cb->stage)
+ continue;
+
+ pgaio_debug_io(DEBUG3, ioh,
+ "calling cb #%d %d/%s->stage(%u)",
+ i, cb_id, ce->name, cb_data);
+ ce->cb->stage(ioh, cb_data);
+ }
+}
+
+/*
+ * Internal function which invokes ->complete_shared for all the registered
+ * callbacks.
+ */
+void
+pgaio_io_call_complete_shared(PgAioHandle *ioh)
+{
+ PgAioResult result;
+
+ START_CRIT_SECTION();
+
+ Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
+ Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
+
+ result.status = ARS_OK; /* low level IO is always considered OK */
+ result.result = ioh->result;
+ result.id = PGAIO_HCB_INVALID;
+ result.error_data = 0;
+
+ /*
+ * Call callbacks with the last registered (innermost) callback first.
+ * Each callback can modify the result forwarded to the next callback.
+ */
+ for (int i = ioh->num_callbacks; i > 0; i--)
+ {
+ PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
+ uint8 cb_data = ioh->callbacks_data[i - 1];
+ const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
+
+ if (!ce->cb->complete_shared)
+ continue;
+
+ pgaio_debug_io(DEBUG4, ioh,
+ "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
+ i, cb_id, ce->name,
+ cb_data,
+ pgaio_result_status_string(result.status),
+ result.id, result.error_data, result.result);
+ result = ce->cb->complete_shared(ioh, result, cb_data);
+ }
+
+ ioh->distilled_result = result;
+
+ pgaio_debug_io(DEBUG3, ioh,
+ "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
+ pgaio_result_status_string(result.status),
+ result.id, result.error_data, result.result,
+ ioh->result);
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Internal function which invokes ->complete_local for all the registered
+ * callbacks.
+ *
+ * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
+ */
+void
+pgaio_io_call_complete_local(PgAioHandle *ioh)
+{
+ PgAioResult result;
+
+ START_CRIT_SECTION();
+
+ Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
+ Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
+
+ /* start with distilled result from shared callback */
+ result = ioh->distilled_result;
+
+ for (int i = ioh->num_callbacks; i > 0; i--)
+ {
+ PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
+ uint8 cb_data = ioh->callbacks_data[i - 1];
+ const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
+
+ if (!ce->cb->complete_local)
+ continue;
+
+ pgaio_debug_io(DEBUG4, ioh,
+ "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
+ i, cb_id, ce->name, cb_data,
+ pgaio_result_status_string(result.status),
+ result.id, result.error_data, result.result);
+ result = ce->cb->complete_local(ioh, result, cb_data);
+ }
+
+ /*
+ * Note that we don't save the result in ioh->distilled_result, the local
+ * callback's result should not ever matter to other waiters.
+ */
+ pgaio_debug_io(DEBUG3, ioh,
+ "after local completion: distilled result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
+ pgaio_result_status_string(result.status),
+ result.id, result.error_data, result.result,
+ ioh->result);
+
+ END_CRIT_SECTION();
+}