diff options
Diffstat (limited to 'src/backend/storage/aio/aio_callback.c')
-rw-r--r-- | src/backend/storage/aio/aio_callback.c | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c new file mode 100644 index 00000000000..d5a2cca28f1 --- /dev/null +++ b/src/backend/storage/aio/aio_callback.c @@ -0,0 +1,308 @@ +/*------------------------------------------------------------------------- + * + * aio_callback.c + * AIO - Functionality related to callbacks that can be registered on IO + * Handles + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/aio_callback.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/aio.h" +#include "storage/aio_internal.h" + + +/* just to have something to put into aio_handle_cbs */ +static const PgAioHandleCallbacks aio_invalid_cb = {0}; + +typedef struct PgAioHandleCallbacksEntry +{ + const PgAioHandleCallbacks *const cb; + const char *const name; +} PgAioHandleCallbacksEntry; + +/* + * Callback definition for the callbacks that can be registered on an IO + * handle. See PgAioHandleCallbackID's definition for an explanation for why + * callbacks are not identified by a pointer. + */ +static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { +#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback} + CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), +#undef CALLBACK_ENTRY +}; + + + +/* -------------------------------------------------------------------------------- + * Public callback related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Register callback for the IO handle. + * + * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be + * registered for each IO. + * + * Callbacks need to be registered before [indirectly] calling + * pgaio_io_prep_*(), as the IO may be executed immediately. + * + * A callback can be passed a small bit of data, e.g. to indicate whether to + * zero a buffer if it is invalid. + * + * + * Note that callbacks are executed in critical sections. This is necessary + * to be able to execute IO in critical sections (consider e.g. WAL + * logging). To perform AIO we first need to acquire a handle, which, if there + * are no free handles, requires waiting for IOs to complete and to execute + * their completion callbacks. + * + * Callbacks may be executed in the issuing backend but also in another + * backend (because that backend is waiting for the IO) or in IO workers (if + * io_method=worker is used). + * + * + * See PgAioHandleCallbackID's definition for an explanation for why + * callbacks are not identified by a pointer. + */ +void +pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id, + uint8 cb_data) +{ + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (cb_id >= lengthof(aio_handle_cbs)) + elog(ERROR, "callback %d is out of range", cb_id); + if (aio_handle_cbs[cb_id].cb->complete_shared == NULL && + aio_handle_cbs[cb_id].cb->complete_local == NULL) + elog(ERROR, "callback %d does not have a completion callback", cb_id); + if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS) + elog(PANIC, "too many callbacks, the max is %d", + PGAIO_HANDLE_MAX_CALLBACKS); + ioh->callbacks[ioh->num_callbacks] = cb_id; + ioh->callbacks_data[ioh->num_callbacks] = cb_data; + + pgaio_debug_io(DEBUG3, ioh, + "adding cb #%d, id %d/%s", + ioh->num_callbacks + 1, + cb_id, ce->name); + + ioh->num_callbacks++; +} + +/* + * Associate an array of data with the Handle. This is e.g. useful to the + * transport knowledge about which buffers a multi-block IO affects to + * completion callbacks. + * + * Right now this can be done only once for each IO, even though multiple + * callbacks can be registered. There aren't any known usecases requiring more + * and the required amount of shared memory does add up, so it doesn't seem + * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS. + */ +void +pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->handle_data_len == 0); + Assert(len <= PG_IOV_MAX); + + for (int i = 0; i < len; i++) + pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i]; + ioh->handle_data_len = len; +} + +/* + * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit + * array to a 64bit array. Without it callers would end up needing to + * open-code equivalent code. + */ +void +pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->handle_data_len == 0); + Assert(len <= PG_IOV_MAX); + + for (int i = 0; i < len; i++) + pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i]; + ioh->handle_data_len = len; +} + +/* + * Return data set with pgaio_io_set_handle_data_*(). + */ +uint64 * +pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len) +{ + Assert(ioh->handle_data_len > 0); + + *len = ioh->handle_data_len; + + return &pgaio_ctl->handle_data[ioh->iovec_off]; +} + + + +/* -------------------------------------------------------------------------------- + * Public IO Result related functions + * -------------------------------------------------------------------------------- + */ + +void +pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel) +{ + PgAioHandleCallbackID cb_id = result.id; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + Assert(result.status != ARS_UNKNOWN); + Assert(result.status != ARS_OK); + + if (ce->cb->report == NULL) + elog(ERROR, "callback %d/%s does not have report callback", + result.id, ce->name); + + ce->cb->report(result, target_data, elevel); +} + + + +/* -------------------------------------------------------------------------------- + * Internal callback related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Internal function which invokes ->stage for all the registered callbacks. + */ +void +pgaio_io_call_stage(PgAioHandle *ioh) +{ + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->stage) + continue; + + pgaio_debug_io(DEBUG3, ioh, + "calling cb #%d %d/%s->stage(%u)", + i, cb_id, ce->name, cb_data); + ce->cb->stage(ioh, cb_data); + } +} + +/* + * Internal function which invokes ->complete_shared for all the registered + * callbacks. + */ +void +pgaio_io_call_complete_shared(PgAioHandle *ioh) +{ + PgAioResult result; + + START_CRIT_SECTION(); + + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + result.status = ARS_OK; /* low level IO is always considered OK */ + result.result = ioh->result; + result.id = PGAIO_HCB_INVALID; + result.error_data = 0; + + /* + * Call callbacks with the last registered (innermost) callback first. + * Each callback can modify the result forwarded to the next callback. + */ + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->complete_shared) + continue; + + pgaio_debug_io(DEBUG4, ioh, + "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)", + i, cb_id, ce->name, + cb_data, + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result); + result = ce->cb->complete_shared(ioh, result, cb_data); + } + + ioh->distilled_result = result; + + pgaio_debug_io(DEBUG3, ioh, + "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d", + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result, + ioh->result); + + END_CRIT_SECTION(); +} + +/* + * Internal function which invokes ->complete_local for all the registered + * callbacks. + * + * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared(). + */ +void +pgaio_io_call_complete_local(PgAioHandle *ioh) +{ + PgAioResult result; + + START_CRIT_SECTION(); + + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + /* start with distilled result from shared callback */ + result = ioh->distilled_result; + + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->complete_local) + continue; + + pgaio_debug_io(DEBUG4, ioh, + "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d", + i, cb_id, ce->name, cb_data, + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result); + result = ce->cb->complete_local(ioh, result, cb_data); + } + + /* + * Note that we don't save the result in ioh->distilled_result, the local + * callback's result should not ever matter to other waiters. + */ + pgaio_debug_io(DEBUG3, ioh, + "after local completion: distilled result: (status %s, id %u, error_data %d, result %d), raw_result: %d", + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result, + ioh->result); + + END_CRIT_SECTION(); +} |