diff options
| author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2014-11-29 23:55:00 -0300 |
|---|---|---|
| committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2014-11-29 23:55:00 -0300 |
| commit | 22dfd116a127a2fc916a4fdac282ee69d4905a25 (patch) | |
| tree | f72fa75958a5c9bbecda92bee316258c5680c410 /contrib | |
| parent | 5b12987b2e80fcf3af1f6fd23954da5c453e9e64 (diff) | |
Move test modules from contrib to src/test/modules
This is advance preparation for introducing even more test modules; the
easy solution is to add them to contrib, but that's bloated enough that
it seems a good time to think of something different.
Moved modules are dummy_seclabel, test_shm_mq, test_parser and
worker_spi.
(test_decoding was also a candidate, but there was too much opposition
to moving that one. We can always reconsider later.)
Diffstat (limited to 'contrib')
25 files changed, 1 insertions, 1728 deletions
diff --git a/contrib/Makefile b/contrib/Makefile index b37d0dd2c31..195d4472c57 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -16,7 +16,6 @@ SUBDIRS = \ dblink \ dict_int \ dict_xsyn \ - dummy_seclabel \ earthdistance \ file_fdw \ fuzzystrmatch \ @@ -51,12 +50,9 @@ SUBDIRS = \ tablefunc \ tcn \ test_decoding \ - test_parser \ - test_shm_mq \ tsearch2 \ unaccent \ - vacuumlo \ - worker_spi + vacuumlo ifeq ($(with_openssl),yes) SUBDIRS += sslinfo diff --git a/contrib/dummy_seclabel/Makefile b/contrib/dummy_seclabel/Makefile deleted file mode 100644 index e69aa1ff6c4..00000000000 --- a/contrib/dummy_seclabel/Makefile +++ /dev/null @@ -1,15 +0,0 @@ -# contrib/dummy_seclabel/Makefile - -MODULES = dummy_seclabel -PGFILEDESC = "dummy_seclabel - regression testing of the SECURITY LABEL statement" - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -subdir = contrib/dummy_seclabel -top_builddir = ../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/contrib/dummy_seclabel/dummy_seclabel.c b/contrib/dummy_seclabel/dummy_seclabel.c deleted file mode 100644 index b5753cc9084..00000000000 --- a/contrib/dummy_seclabel/dummy_seclabel.c +++ /dev/null @@ -1,50 +0,0 @@ -/* - * dummy_seclabel.c - * - * Dummy security label provider. - * - * This module does not provide anything worthwhile from a security - * perspective, but allows regression testing independent of platform-specific - * features like SELinux. - * - * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - */ -#include "postgres.h" - -#include "commands/seclabel.h" -#include "miscadmin.h" -#include "utils/rel.h" - -PG_MODULE_MAGIC; - -/* Entrypoint of the module */ -void _PG_init(void); - -static void -dummy_object_relabel(const ObjectAddress *object, const char *seclabel) -{ - if (seclabel == NULL || - strcmp(seclabel, "unclassified") == 0 || - strcmp(seclabel, "classified") == 0) - return; - - if (strcmp(seclabel, "secret") == 0 || - strcmp(seclabel, "top secret") == 0) - { - if (!superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("only superuser can set '%s' label", seclabel))); - return; - } - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("'%s' is not a valid security label", seclabel))); -} - -void -_PG_init(void) -{ - register_label_provider("dummy", dummy_object_relabel); -} diff --git a/contrib/test_parser/.gitignore b/contrib/test_parser/.gitignore deleted file mode 100644 index 5dcb3ff9723..00000000000 --- a/contrib/test_parser/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Generated subdirectories -/log/ -/results/ -/tmp_check/ diff --git a/contrib/test_parser/Makefile b/contrib/test_parser/Makefile deleted file mode 100644 index 7e068abd3e6..00000000000 --- a/contrib/test_parser/Makefile +++ /dev/null @@ -1,21 +0,0 @@ -# contrib/test_parser/Makefile - -MODULE_big = test_parser -OBJS = test_parser.o $(WIN32RES) -PGFILEDESC = "test_parser - example of a custom parser for full-text search" - -EXTENSION = test_parser -DATA = test_parser--1.0.sql test_parser--unpackaged--1.0.sql - -REGRESS = test_parser - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -subdir = contrib/test_parser -top_builddir = ../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/contrib/test_parser/expected/test_parser.out b/contrib/test_parser/expected/test_parser.out deleted file mode 100644 index 8a49bc01e32..00000000000 --- a/contrib/test_parser/expected/test_parser.out +++ /dev/null @@ -1,44 +0,0 @@ -CREATE EXTENSION test_parser; --- make test configuration using parser -CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser); -ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple; --- ts_parse -SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/'); - tokid | token --------+----------------------- - 3 | That's - 12 | - 3 | simple - 12 | - 3 | parser - 12 | - 3 | can't - 12 | - 3 | parse - 12 | - 3 | urls - 12 | - 3 | like - 12 | - 3 | http://some.url/here/ -(15 rows) - -SELECT to_tsvector('testcfg','That''s my first own parser'); - to_tsvector -------------------------------------------------- - 'first':3 'my':2 'own':4 'parser':5 'that''s':1 -(1 row) - -SELECT to_tsquery('testcfg', 'star'); - to_tsquery ------------- - 'star' -(1 row) - -SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies', - to_tsquery('testcfg', 'stars')); - ts_headline ------------------------------------------------------------------ - Supernovae <b>stars</b> are the brightest phenomena in galaxies -(1 row) - diff --git a/contrib/test_parser/sql/test_parser.sql b/contrib/test_parser/sql/test_parser.sql deleted file mode 100644 index 1f21504602b..00000000000 --- a/contrib/test_parser/sql/test_parser.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE EXTENSION test_parser; - --- make test configuration using parser - -CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser); - -ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple; - --- ts_parse - -SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/'); - -SELECT to_tsvector('testcfg','That''s my first own parser'); - -SELECT to_tsquery('testcfg', 'star'); - -SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies', - to_tsquery('testcfg', 'stars')); diff --git a/contrib/test_parser/test_parser--1.0.sql b/contrib/test_parser/test_parser--1.0.sql deleted file mode 100644 index 06c94d353be..00000000000 --- a/contrib/test_parser/test_parser--1.0.sql +++ /dev/null @@ -1,32 +0,0 @@ -/* contrib/test_parser/test_parser--1.0.sql */ - --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION test_parser" to load this file. \quit - -CREATE FUNCTION testprs_start(internal, int4) -RETURNS internal -AS 'MODULE_PATHNAME' -LANGUAGE C STRICT; - -CREATE FUNCTION testprs_getlexeme(internal, internal, internal) -RETURNS internal -AS 'MODULE_PATHNAME' -LANGUAGE C STRICT; - -CREATE FUNCTION testprs_end(internal) -RETURNS void -AS 'MODULE_PATHNAME' -LANGUAGE C STRICT; - -CREATE FUNCTION testprs_lextype(internal) -RETURNS internal -AS 'MODULE_PATHNAME' -LANGUAGE C STRICT; - -CREATE TEXT SEARCH PARSER testparser ( - START = testprs_start, - GETTOKEN = testprs_getlexeme, - END = testprs_end, - HEADLINE = pg_catalog.prsd_headline, - LEXTYPES = testprs_lextype -); diff --git a/contrib/test_parser/test_parser--unpackaged--1.0.sql b/contrib/test_parser/test_parser--unpackaged--1.0.sql deleted file mode 100644 index 62458bd2c68..00000000000 --- a/contrib/test_parser/test_parser--unpackaged--1.0.sql +++ /dev/null @@ -1,10 +0,0 @@ -/* contrib/test_parser/test_parser--unpackaged--1.0.sql */ - --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION test_parser FROM unpackaged" to load this file. \quit - -ALTER EXTENSION test_parser ADD function testprs_start(internal,integer); -ALTER EXTENSION test_parser ADD function testprs_getlexeme(internal,internal,internal); -ALTER EXTENSION test_parser ADD function testprs_end(internal); -ALTER EXTENSION test_parser ADD function testprs_lextype(internal); -ALTER EXTENSION test_parser ADD text search parser testparser; diff --git a/contrib/test_parser/test_parser.c b/contrib/test_parser/test_parser.c deleted file mode 100644 index cbf77966ae5..00000000000 --- a/contrib/test_parser/test_parser.c +++ /dev/null @@ -1,128 +0,0 @@ -/*------------------------------------------------------------------------- - * - * test_parser.c - * Simple example of a text search parser - * - * Copyright (c) 2007-2014, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/test_parser/test_parser.c - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "fmgr.h" - -PG_MODULE_MAGIC; - -/* - * types - */ - -/* self-defined type */ -typedef struct -{ - char *buffer; /* text to parse */ - int len; /* length of the text in buffer */ - int pos; /* position of the parser */ -} ParserState; - -/* copy-paste from wparser.h of tsearch2 */ -typedef struct -{ - int lexid; - char *alias; - char *descr; -} LexDescr; - -/* - * functions - */ -PG_FUNCTION_INFO_V1(testprs_start); -PG_FUNCTION_INFO_V1(testprs_getlexeme); -PG_FUNCTION_INFO_V1(testprs_end); -PG_FUNCTION_INFO_V1(testprs_lextype); - -Datum -testprs_start(PG_FUNCTION_ARGS) -{ - ParserState *pst = (ParserState *) palloc0(sizeof(ParserState)); - - pst->buffer = (char *) PG_GETARG_POINTER(0); - pst->len = PG_GETARG_INT32(1); - pst->pos = 0; - - PG_RETURN_POINTER(pst); -} - -Datum -testprs_getlexeme(PG_FUNCTION_ARGS) -{ - ParserState *pst = (ParserState *) PG_GETARG_POINTER(0); - char **t = (char **) PG_GETARG_POINTER(1); - int *tlen = (int *) PG_GETARG_POINTER(2); - int startpos = pst->pos; - int type; - - *t = pst->buffer + pst->pos; - - if (pst->pos < pst->len && - (pst->buffer)[pst->pos] == ' ') - { - /* blank type */ - type = 12; - /* go to the next non-space character */ - while (pst->pos < pst->len && - (pst->buffer)[pst->pos] == ' ') - (pst->pos)++; - } - else - { - /* word type */ - type = 3; - /* go to the next space character */ - while (pst->pos < pst->len && - (pst->buffer)[pst->pos] != ' ') - (pst->pos)++; - } - - *tlen = pst->pos - startpos; - - /* we are finished if (*tlen == 0) */ - if (*tlen == 0) - type = 0; - - PG_RETURN_INT32(type); -} - -Datum -testprs_end(PG_FUNCTION_ARGS) -{ - ParserState *pst = (ParserState *) PG_GETARG_POINTER(0); - - pfree(pst); - PG_RETURN_VOID(); -} - -Datum -testprs_lextype(PG_FUNCTION_ARGS) -{ - /* - * Remarks: - we have to return the blanks for headline reason - we use - * the same lexids like Teodor in the default word parser; in this way we - * can reuse the headline function of the default word parser. - */ - LexDescr *descr = (LexDescr *) palloc(sizeof(LexDescr) * (2 + 1)); - - /* there are only two types in this parser */ - descr[0].lexid = 3; - descr[0].alias = pstrdup("word"); - descr[0].descr = pstrdup("Word"); - descr[1].lexid = 12; - descr[1].alias = pstrdup("blank"); - descr[1].descr = pstrdup("Space symbols"); - descr[2].lexid = 0; - - PG_RETURN_POINTER(descr); -} diff --git a/contrib/test_parser/test_parser.control b/contrib/test_parser/test_parser.control deleted file mode 100644 index 36b26b2087c..00000000000 --- a/contrib/test_parser/test_parser.control +++ /dev/null @@ -1,5 +0,0 @@ -# test_parser extension -comment = 'example of a custom parser for full-text search' -default_version = '1.0' -module_pathname = '$libdir/test_parser' -relocatable = true diff --git a/contrib/test_shm_mq/.gitignore b/contrib/test_shm_mq/.gitignore deleted file mode 100644 index 5dcb3ff9723..00000000000 --- a/contrib/test_shm_mq/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Generated subdirectories -/log/ -/results/ -/tmp_check/ diff --git a/contrib/test_shm_mq/Makefile b/contrib/test_shm_mq/Makefile deleted file mode 100644 index e3c405442c5..00000000000 --- a/contrib/test_shm_mq/Makefile +++ /dev/null @@ -1,21 +0,0 @@ -# contrib/test_shm_mq/Makefile - -MODULE_big = test_shm_mq -OBJS = test.o setup.o worker.o $(WIN32RES) -PGFILEDESC = "test_shm_mq - example use of shared memory message queue" - -EXTENSION = test_shm_mq -DATA = test_shm_mq--1.0.sql - -REGRESS = test_shm_mq - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -subdir = contrib/test_shm_mq -top_builddir = ../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/contrib/test_shm_mq/expected/test_shm_mq.out b/contrib/test_shm_mq/expected/test_shm_mq.out deleted file mode 100644 index c4858b0c205..00000000000 --- a/contrib/test_shm_mq/expected/test_shm_mq.out +++ /dev/null @@ -1,36 +0,0 @@ -CREATE EXTENSION test_shm_mq; --- --- These tests don't produce any interesting output. We're checking that --- the operations complete without crashing or hanging and that none of their --- internal sanity tests fail. --- -SELECT test_shm_mq(1024, '', 2000, 1); - test_shm_mq -------------- - -(1 row) - -SELECT test_shm_mq(1024, 'a', 2001, 1); - test_shm_mq -------------- - -(1 row) - -SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); - test_shm_mq -------------- - -(1 row) - -SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); - test_shm_mq -------------- - -(1 row) - -SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); - test_shm_mq_pipelined ------------------------ - -(1 row) - diff --git a/contrib/test_shm_mq/setup.c b/contrib/test_shm_mq/setup.c deleted file mode 100644 index 572cf8898f1..00000000000 --- a/contrib/test_shm_mq/setup.c +++ /dev/null @@ -1,328 +0,0 @@ -/*-------------------------------------------------------------------------- - * - * setup.c - * Code to set up a dynamic shared memory segments and a specified - * number of background workers for shared memory message queue - * testing. - * - * Copyright (C) 2013, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/test_shm_mq/setup.c - * - * ------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "miscadmin.h" -#include "postmaster/bgworker.h" -#include "storage/procsignal.h" -#include "storage/shm_toc.h" -#include "utils/memutils.h" - -#include "test_shm_mq.h" - -typedef struct -{ - int nworkers; - BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; -} worker_state; - -static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, - dsm_segment **segp, - test_shm_mq_header **hdrp, - shm_mq **outp, shm_mq **inp); -static worker_state *setup_background_workers(int nworkers, - dsm_segment *seg); -static void cleanup_background_workers(dsm_segment *seg, Datum arg); -static void wait_for_workers_to_become_ready(worker_state *wstate, - volatile test_shm_mq_header *hdr); -static bool check_worker_status(worker_state *wstate); - -/* - * Set up a dynamic shared memory segment and zero or more background workers - * for a test run. - */ -void -test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, - shm_mq_handle **output, shm_mq_handle **input) -{ - dsm_segment *seg; - test_shm_mq_header *hdr; - shm_mq *outq = NULL; /* placate compiler */ - shm_mq *inq = NULL; /* placate compiler */ - worker_state *wstate; - - /* Set up a dynamic shared memory segment. */ - setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); - *segp = seg; - - /* Register background workers. */ - wstate = setup_background_workers(nworkers, seg); - - /* Attach the queues. */ - *output = shm_mq_attach(outq, seg, wstate->handle[0]); - *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]); - - /* Wait for workers to become ready. */ - wait_for_workers_to_become_ready(wstate, hdr); - - /* - * Once we reach this point, all workers are ready. We no longer need to - * kill them if we die; they'll die on their own as the message queues - * shut down. - */ - cancel_on_dsm_detach(seg, cleanup_background_workers, - PointerGetDatum(wstate)); - pfree(wstate); -} - -/* - * Set up a dynamic shared memory segment. - * - * We set up a small control region that contains only a test_shm_mq_header, - * plus one region per message queue. There are as many message queues as - * the number of workers, plus one. - */ -static void -setup_dynamic_shared_memory(int64 queue_size, int nworkers, - dsm_segment **segp, test_shm_mq_header **hdrp, - shm_mq **outp, shm_mq **inp) -{ - shm_toc_estimator e; - int i; - Size segsize; - dsm_segment *seg; - shm_toc *toc; - test_shm_mq_header *hdr; - - /* Ensure a valid queue size. */ - if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("queue size must be at least %zu bytes", - shm_mq_minimum_size))); - if (queue_size != ((Size) queue_size)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("queue size overflows size_t"))); - - /* - * Estimate how much shared memory we need. - * - * Because the TOC machinery may choose to insert padding of oddly-sized - * requests, we must estimate each chunk separately. - * - * We need one key to register the location of the header, and we need - * nworkers + 1 keys to track the locations of the message queues. - */ - shm_toc_initialize_estimator(&e); - shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header)); - for (i = 0; i <= nworkers; ++i) - shm_toc_estimate_chunk(&e, (Size) queue_size); - shm_toc_estimate_keys(&e, 2 + nworkers); - segsize = shm_toc_estimate(&e); - - /* Create the shared memory segment and establish a table of contents. */ - seg = dsm_create(shm_toc_estimate(&e)); - toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg), - segsize); - - /* Set up the header region. */ - hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header)); - SpinLockInit(&hdr->mutex); - hdr->workers_total = nworkers; - hdr->workers_attached = 0; - hdr->workers_ready = 0; - shm_toc_insert(toc, 0, hdr); - - /* Set up one message queue per worker, plus one. */ - for (i = 0; i <= nworkers; ++i) - { - shm_mq *mq; - - mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), - (Size) queue_size); - shm_toc_insert(toc, i + 1, mq); - - if (i == 0) - { - /* We send messages to the first queue. */ - shm_mq_set_sender(mq, MyProc); - *outp = mq; - } - if (i == nworkers) - { - /* We receive messages from the last queue. */ - shm_mq_set_receiver(mq, MyProc); - *inp = mq; - } - } - - /* Return results to caller. */ - *segp = seg; - *hdrp = hdr; -} - -/* - * Register background workers. - */ -static worker_state * -setup_background_workers(int nworkers, dsm_segment *seg) -{ - MemoryContext oldcontext; - BackgroundWorker worker; - worker_state *wstate; - int i; - - /* - * We need the worker_state object and the background worker handles to - * which it points to be allocated in CurTransactionContext rather than - * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach - * hooks run. - */ - oldcontext = MemoryContextSwitchTo(CurTransactionContext); - - /* Create worker state object. */ - wstate = MemoryContextAlloc(TopTransactionContext, - offsetof(worker_state, handle) + - sizeof(BackgroundWorkerHandle *) * nworkers); - wstate->nworkers = 0; - - /* - * Arrange to kill all the workers if we abort before all workers are - * finished hooking themselves up to the dynamic shared memory segment. - * - * If we die after all the workers have finished hooking themselves up to - * the dynamic shared memory segment, we'll mark the two queues to which - * we're directly connected as detached, and the worker(s) connected to - * those queues will exit, marking any other queues to which they are - * connected as detached. This will cause any as-yet-unaware workers - * connected to those queues to exit in their turn, and so on, until - * everybody exits. - * - * But suppose the workers which are supposed to connect to the queues to - * which we're directly attached exit due to some error before they - * actually attach the queues. The remaining workers will have no way of - * knowing this. From their perspective, they're still waiting for those - * workers to start, when in fact they've already died. - */ - on_dsm_detach(seg, cleanup_background_workers, - PointerGetDatum(wstate)); - - /* Configure a worker. */ - worker.bgw_flags = BGWORKER_SHMEM_ACCESS; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = BGW_NEVER_RESTART; - worker.bgw_main = NULL; /* new worker might not have library loaded */ - sprintf(worker.bgw_library_name, "test_shm_mq"); - sprintf(worker.bgw_function_name, "test_shm_mq_main"); - snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq"); - worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); - /* set bgw_notify_pid, so we can detect if the worker stops */ - worker.bgw_notify_pid = MyProcPid; - - /* Register the workers. */ - for (i = 0; i < nworkers; ++i) - { - if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not register background process"), - errhint("You may need to increase max_worker_processes."))); - ++wstate->nworkers; - } - - /* All done. */ - MemoryContextSwitchTo(oldcontext); - return wstate; -} - -static void -cleanup_background_workers(dsm_segment *seg, Datum arg) -{ - worker_state *wstate = (worker_state *) DatumGetPointer(arg); - - while (wstate->nworkers > 0) - { - --wstate->nworkers; - TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); - } -} - -static void -wait_for_workers_to_become_ready(worker_state *wstate, - volatile test_shm_mq_header *hdr) -{ - bool save_set_latch_on_sigusr1; - bool result = false; - - save_set_latch_on_sigusr1 = set_latch_on_sigusr1; - set_latch_on_sigusr1 = true; - - PG_TRY(); - { - for (;;) - { - int workers_ready; - - /* If all the workers are ready, we have succeeded. */ - SpinLockAcquire(&hdr->mutex); - workers_ready = hdr->workers_ready; - SpinLockRelease(&hdr->mutex); - if (workers_ready >= wstate->nworkers) - { - result = true; - break; - } - - /* If any workers (or the postmaster) have died, we have failed. */ - if (!check_worker_status(wstate)) - { - result = false; - break; - } - - /* Wait to be signalled. */ - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); - - /* An interrupt may have occurred while we were waiting. */ - CHECK_FOR_INTERRUPTS(); - - /* Reset the latch so we don't spin. */ - ResetLatch(&MyProc->procLatch); - } - } - PG_CATCH(); - { - set_latch_on_sigusr1 = save_set_latch_on_sigusr1; - PG_RE_THROW(); - } - PG_END_TRY(); - - if (!result) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("one or more background workers failed to start"))); -} - -static bool -check_worker_status(worker_state *wstate) -{ - int n; - - /* If any workers (or the postmaster) have died, we have failed. */ - for (n = 0; n < wstate->nworkers; ++n) - { - BgwHandleStatus status; - pid_t pid; - - status = GetBackgroundWorkerPid(wstate->handle[n], &pid); - if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED) - return false; - } - - /* Otherwise, things still look OK. */ - return true; -} diff --git a/contrib/test_shm_mq/sql/test_shm_mq.sql b/contrib/test_shm_mq/sql/test_shm_mq.sql deleted file mode 100644 index 9de19d304a2..00000000000 --- a/contrib/test_shm_mq/sql/test_shm_mq.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE EXTENSION test_shm_mq; - --- --- These tests don't produce any interesting output. We're checking that --- the operations complete without crashing or hanging and that none of their --- internal sanity tests fail. --- -SELECT test_shm_mq(1024, '', 2000, 1); -SELECT test_shm_mq(1024, 'a', 2001, 1); -SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); -SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); -SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); diff --git a/contrib/test_shm_mq/test.c b/contrib/test_shm_mq/test.c deleted file mode 100644 index 95d620f7569..00000000000 --- a/contrib/test_shm_mq/test.c +++ /dev/null @@ -1,264 +0,0 @@ -/*-------------------------------------------------------------------------- - * - * test.c - * Test harness code for shared memory message queues. - * - * Copyright (C) 2013, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/test_shm_mq/test.c - * - * ------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "fmgr.h" -#include "miscadmin.h" - -#include "test_shm_mq.h" - -PG_MODULE_MAGIC; - -PG_FUNCTION_INFO_V1(test_shm_mq); -PG_FUNCTION_INFO_V1(test_shm_mq_pipelined); - -void _PG_init(void); - -static void verify_message(Size origlen, char *origdata, Size newlen, - char *newdata); - -/* - * Simple test of the shared memory message queue infrastructure. - * - * We set up a ring of message queues passing through 1 or more background - * processes and eventually looping back to ourselves. We then send a message - * through the ring a number of times indicated by the loop count. At the end, - * we check whether the final message matches the one we started with. - */ -Datum -test_shm_mq(PG_FUNCTION_ARGS) -{ - int64 queue_size = PG_GETARG_INT64(0); - text *message = PG_GETARG_TEXT_PP(1); - char *message_contents = VARDATA_ANY(message); - int message_size = VARSIZE_ANY_EXHDR(message); - int32 loop_count = PG_GETARG_INT32(2); - int32 nworkers = PG_GETARG_INT32(3); - dsm_segment *seg; - shm_mq_handle *outqh; - shm_mq_handle *inqh; - shm_mq_result res; - Size len; - void *data; - - /* A negative loopcount is nonsensical. */ - if (loop_count < 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("repeat count size must be a non-negative integer"))); - - /* - * Since this test sends data using the blocking interfaces, it cannot - * send data to itself. Therefore, a minimum of 1 worker is required. Of - * course, a negative worker count is nonsensical. - */ - if (nworkers < 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("number of workers must be a positive integer"))); - - /* Set up dynamic shared memory segment and background workers. */ - test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); - - /* Send the initial message. */ - res = shm_mq_send(outqh, message_size, message_contents, false); - if (res != SHM_MQ_SUCCESS) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send message"))); - - /* - * Receive a message and send it back out again. Do this a number of - * times equal to the loop count. - */ - for (;;) - { - /* Receive a message. */ - res = shm_mq_receive(inqh, &len, &data, false); - if (res != SHM_MQ_SUCCESS) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not receive message"))); - - /* If this is supposed to be the last iteration, stop here. */ - if (--loop_count <= 0) - break; - - /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); - if (res != SHM_MQ_SUCCESS) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send message"))); - } - - /* - * Finally, check that we got back the same message from the last - * iteration that we originally sent. - */ - verify_message(message_size, message_contents, len, data); - - /* Clean up. */ - dsm_detach(seg); - - PG_RETURN_VOID(); -} - -/* - * Pipelined test of the shared memory message queue infrastructure. - * - * As in the basic test, we set up a ring of message queues passing through - * 1 or more background processes and eventually looping back to ourselves. - * Then, we send N copies of the user-specified message through the ring and - * receive them all back. Since this might fill up all message queues in the - * ring and then stall, we must be prepared to begin receiving the messages - * back before we've finished sending them. - */ -Datum -test_shm_mq_pipelined(PG_FUNCTION_ARGS) -{ - int64 queue_size = PG_GETARG_INT64(0); - text *message = PG_GETARG_TEXT_PP(1); - char *message_contents = VARDATA_ANY(message); - int message_size = VARSIZE_ANY_EXHDR(message); - int32 loop_count = PG_GETARG_INT32(2); - int32 nworkers = PG_GETARG_INT32(3); - bool verify = PG_GETARG_BOOL(4); - int32 send_count = 0; - int32 receive_count = 0; - dsm_segment *seg; - shm_mq_handle *outqh; - shm_mq_handle *inqh; - shm_mq_result res; - Size len; - void *data; - - /* A negative loopcount is nonsensical. */ - if (loop_count < 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("repeat count size must be a non-negative integer"))); - - /* - * Using the nonblocking interfaces, we can even send data to ourselves, - * so the minimum number of workers for this test is zero. - */ - if (nworkers < 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("number of workers must be a non-negative integer"))); - - /* Set up dynamic shared memory segment and background workers. */ - test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); - - /* Main loop. */ - for (;;) - { - bool wait = true; - - /* - * If we haven't yet sent the message the requisite number of times, - * try again to send it now. Note that when shm_mq_send() returns - * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the - * same message size and contents; that's not an issue here because - * we're sending the same message every time. - */ - if (send_count < loop_count) - { - res = shm_mq_send(outqh, message_size, message_contents, true); - if (res == SHM_MQ_SUCCESS) - { - ++send_count; - wait = false; - } - else if (res == SHM_MQ_DETACHED) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send message"))); - } - - /* - * If we haven't yet received the message the requisite number of - * times, try to receive it again now. - */ - if (receive_count < loop_count) - { - res = shm_mq_receive(inqh, &len, &data, true); - if (res == SHM_MQ_SUCCESS) - { - ++receive_count; - /* Verifying every time is slow, so it's optional. */ - if (verify) - verify_message(message_size, message_contents, len, data); - wait = false; - } - else if (res == SHM_MQ_DETACHED) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not receive message"))); - } - else - { - /* - * Otherwise, we've received the message enough times. This - * shouldn't happen unless we've also sent it enough times. - */ - if (send_count != receive_count) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("message sent %d times, but received %d times", - send_count, receive_count))); - break; - } - - if (wait) - { - /* - * If we made no progress, wait for one of the other processes to - * which we are connected to set our latch, indicating that they - * have read or written data and therefore there may now be work - * for us to do. - */ - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); - CHECK_FOR_INTERRUPTS(); - ResetLatch(&MyProc->procLatch); - } - } - - /* Clean up. */ - dsm_detach(seg); - - PG_RETURN_VOID(); -} - -/* - * Verify that two messages are the same. - */ -static void -verify_message(Size origlen, char *origdata, Size newlen, char *newdata) -{ - Size i; - - if (origlen != newlen) - ereport(ERROR, - (errmsg("message corrupted"), - errdetail("The original message was %zu bytes but the final message is %zu bytes.", - origlen, newlen))); - - for (i = 0; i < origlen; ++i) - if (origdata[i] != newdata[i]) - ereport(ERROR, - (errmsg("message corrupted"), - errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen))); -} diff --git a/contrib/test_shm_mq/test_shm_mq--1.0.sql b/contrib/test_shm_mq/test_shm_mq--1.0.sql deleted file mode 100644 index 54b225e2ae0..00000000000 --- a/contrib/test_shm_mq/test_shm_mq--1.0.sql +++ /dev/null @@ -1,19 +0,0 @@ -/* contrib/test_shm_mq/test_shm_mq--1.0.sql */ - --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION test_shm_mq" to load this file. \quit - -CREATE FUNCTION test_shm_mq(queue_size pg_catalog.int8, - message pg_catalog.text, - repeat_count pg_catalog.int4 default 1, - num_workers pg_catalog.int4 default 1) - RETURNS pg_catalog.void STRICT - AS 'MODULE_PATHNAME' LANGUAGE C; - -CREATE FUNCTION test_shm_mq_pipelined(queue_size pg_catalog.int8, - message pg_catalog.text, - repeat_count pg_catalog.int4 default 1, - num_workers pg_catalog.int4 default 1, - verify pg_catalog.bool default true) - RETURNS pg_catalog.void STRICT - AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/test_shm_mq/test_shm_mq.control b/contrib/test_shm_mq/test_shm_mq.control deleted file mode 100644 index d9a74c7a323..00000000000 --- a/contrib/test_shm_mq/test_shm_mq.control +++ /dev/null @@ -1,4 +0,0 @@ -comment = 'Test code for shared memory message queues' -default_version = '1.0' -module_pathname = '$libdir/test_shm_mq' -relocatable = true diff --git a/contrib/test_shm_mq/test_shm_mq.h b/contrib/test_shm_mq/test_shm_mq.h deleted file mode 100644 index 04a7931fa94..00000000000 --- a/contrib/test_shm_mq/test_shm_mq.h +++ /dev/null @@ -1,45 +0,0 @@ -/*-------------------------------------------------------------------------- - * - * test_shm_mq.h - * Definitions for shared memory message queues - * - * Copyright (C) 2013, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/test_shm_mq/test_shm_mq.h - * - * ------------------------------------------------------------------------- - */ - -#ifndef TEST_SHM_MQ_H -#define TEST_SHM_MQ_H - -#include "storage/dsm.h" -#include "storage/shm_mq.h" -#include "storage/spin.h" - -/* Identifier for shared memory segments used by this extension. */ -#define PG_TEST_SHM_MQ_MAGIC 0x79fb2447 - -/* - * This structure is stored in the dynamic shared memory segment. We use - * it to determine whether all workers started up OK and successfully - * attached to their respective shared message queues. - */ -typedef struct -{ - slock_t mutex; - int workers_total; - int workers_attached; - int workers_ready; -} test_shm_mq_header; - -/* Set up dynamic shared memory and background workers for test run. */ -extern void test_shm_mq_setup(int64 queue_size, int32 nworkers, - dsm_segment **seg, shm_mq_handle **output, - shm_mq_handle **input); - -/* Main entrypoint for a worker. */ -extern void test_shm_mq_main(Datum) __attribute__((noreturn)); - -#endif diff --git a/contrib/test_shm_mq/worker.c b/contrib/test_shm_mq/worker.c deleted file mode 100644 index 0d66c92ddb1..00000000000 --- a/contrib/test_shm_mq/worker.c +++ /dev/null @@ -1,224 +0,0 @@ -/*-------------------------------------------------------------------------- - * - * worker.c - * Code for sample worker making use of shared memory message queues. - * Our test worker simply reads messages from one message queue and - * writes them back out to another message queue. In a real - * application, you'd presumably want the worker to do some more - * complex calculation rather than simply returning the input, - * but it should be possible to use much of the control logic just - * as presented here. - * - * Copyright (C) 2013, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/test_shm_mq/worker.c - * - * ------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "miscadmin.h" -#include "storage/ipc.h" -#include "storage/procarray.h" -#include "storage/shm_mq.h" -#include "storage/shm_toc.h" -#include "utils/resowner.h" - -#include "test_shm_mq.h" - -static void handle_sigterm(SIGNAL_ARGS); -static void attach_to_queues(dsm_segment *seg, shm_toc *toc, - int myworkernumber, shm_mq_handle **inqhp, - shm_mq_handle **outqhp); -static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh); - -/* - * Background worker entrypoint. - * - * This is intended to demonstrate how a background worker can be used to - * facilitate a parallel computation. Most of the logic here is fairly - * boilerplate stuff, designed to attach to the shared memory segment, - * notify the user backend that we're alive, and so on. The - * application-specific bits of logic that you'd replace for your own worker - * are attach_to_queues() and copy_messages(). - */ -void -test_shm_mq_main(Datum main_arg) -{ - dsm_segment *seg; - shm_toc *toc; - shm_mq_handle *inqh; - shm_mq_handle *outqh; - volatile test_shm_mq_header *hdr; - int myworkernumber; - PGPROC *registrant; - - /* - * Establish signal handlers. - * - * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as - * it would a normal user backend. To make that happen, we establish a - * signal handler that is a stripped-down version of die(). We don't have - * any equivalent of the backend's command-read loop, where interrupts can - * be processed immediately, so make sure ImmediateInterruptOK is turned - * off. - */ - pqsignal(SIGTERM, handle_sigterm); - ImmediateInterruptOK = false; - BackgroundWorkerUnblockSignals(); - - /* - * Connect to the dynamic shared memory segment. - * - * The backend that registered this worker passed us the ID of a shared - * memory segment to which we must attach for further instructions. In - * order to attach to dynamic shared memory, we need a resource owner. - * Once we've mapped the segment in our address space, attach to the table - * of contents so we can locate the various data structures we'll need to - * find within the segment. - */ - CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker"); - seg = dsm_attach(DatumGetInt32(main_arg)); - if (seg == NULL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("unable to map dynamic shared memory segment"))); - toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg)); - if (toc == NULL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("bad magic number in dynamic shared memory segment"))); - - /* - * Acquire a worker number. - * - * By convention, the process registering this background worker should - * have stored the control structure at key 0. We look up that key to - * find it. Our worker number gives our identity: there may be just one - * worker involved in this parallel operation, or there may be many. - */ - hdr = shm_toc_lookup(toc, 0); - SpinLockAcquire(&hdr->mutex); - myworkernumber = ++hdr->workers_attached; - SpinLockRelease(&hdr->mutex); - if (myworkernumber > hdr->workers_total) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("too many message queue testing workers already"))); - - /* - * Attach to the appropriate message queues. - */ - attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh); - - /* - * Indicate that we're fully initialized and ready to begin the main part - * of the parallel operation. - * - * Once we signal that we're ready, the user backend is entitled to assume - * that our on_dsm_detach callbacks will fire before we disconnect from - * the shared memory segment and exit. Generally, that means we must have - * attached to all relevant dynamic shared memory data structures by now. - */ - SpinLockAcquire(&hdr->mutex); - ++hdr->workers_ready; - SpinLockRelease(&hdr->mutex); - registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); - if (registrant == NULL) - { - elog(DEBUG1, "registrant backend has exited prematurely"); - proc_exit(1); - } - SetLatch(®istrant->procLatch); - - /* Do the work. */ - copy_messages(inqh, outqh); - - /* - * We're done. Explicitly detach the shared memory segment so that we - * don't get a resource leak warning at commit time. This will fire any - * on_dsm_detach callbacks we've registered, as well. Once that's done, - * we can go ahead and exit. - */ - dsm_detach(seg); - proc_exit(1); -} - -/* - * Attach to shared memory message queues. - * - * We use our worker number to determine to which queue we should attach. - * The queues are registered at keys 1..<number-of-workers>. The user backend - * writes to queue #1 and reads from queue #<number-of-workers>; each worker - * reads from the queue whose number is equal to its worker number and writes - * to the next higher-numbered queue. - */ -static void -attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber, - shm_mq_handle **inqhp, shm_mq_handle **outqhp) -{ - shm_mq *inq; - shm_mq *outq; - - inq = shm_toc_lookup(toc, myworkernumber); - shm_mq_set_receiver(inq, MyProc); - *inqhp = shm_mq_attach(inq, seg, NULL); - outq = shm_toc_lookup(toc, myworkernumber + 1); - shm_mq_set_sender(outq, MyProc); - *outqhp = shm_mq_attach(outq, seg, NULL); -} - -/* - * Loop, receiving and sending messages, until the connection is broken. - * - * This is the "real work" performed by this worker process. Everything that - * happens before this is initialization of one form or another, and everything - * after this point is cleanup. - */ -static void -copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) -{ - Size len; - void *data; - shm_mq_result res; - - for (;;) - { - /* Notice any interrupts that have occurred. */ - CHECK_FOR_INTERRUPTS(); - - /* Receive a message. */ - res = shm_mq_receive(inqh, &len, &data, false); - if (res != SHM_MQ_SUCCESS) - break; - - /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); - if (res != SHM_MQ_SUCCESS) - break; - } -} - -/* - * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just - * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right - * thing. - */ -static void -handle_sigterm(SIGNAL_ARGS) -{ - int save_errno = errno; - - if (MyProc) - SetLatch(&MyProc->procLatch); - - if (!proc_exit_inprogress) - { - InterruptPending = true; - ProcDiePending = true; - } - - errno = save_errno; -} diff --git a/contrib/worker_spi/Makefile b/contrib/worker_spi/Makefile deleted file mode 100644 index 5cce4d1ef9e..00000000000 --- a/contrib/worker_spi/Makefile +++ /dev/null @@ -1,18 +0,0 @@ -# contrib/worker_spi/Makefile - -MODULES = worker_spi - -EXTENSION = worker_spi -DATA = worker_spi--1.0.sql -PGFILEDESC = "worker_spi - background worker example" - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -subdir = contrib/worker_spi -top_builddir = ../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/contrib/worker_spi/worker_spi--1.0.sql b/contrib/worker_spi/worker_spi--1.0.sql deleted file mode 100644 index 09b7799f2c9..00000000000 --- a/contrib/worker_spi/worker_spi--1.0.sql +++ /dev/null @@ -1,9 +0,0 @@ -/* contrib/worker_spi/worker_spi--1.0.sql */ - --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit - -CREATE FUNCTION worker_spi_launch(pg_catalog.int4) -RETURNS pg_catalog.int4 STRICT -AS 'MODULE_PATHNAME' -LANGUAGE C; diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c deleted file mode 100644 index 328c722c359..00000000000 --- a/contrib/worker_spi/worker_spi.c +++ /dev/null @@ -1,407 +0,0 @@ -/* ------------------------------------------------------------------------- - * - * worker_spi.c - * Sample background worker code that demonstrates various coding - * patterns: establishing a database connection; starting and committing - * transactions; using GUC variables, and heeding SIGHUP to reread - * the configuration file; reporting to pg_stat_activity; using the - * process latch to sleep and exit in case of postmaster death. - * - * This code connects to a database, creates a schema and table, and summarizes - * the numbers contained therein. To see it working, insert an initial value - * with "total" type and some initial value; then insert some other rows with - * "delta" type. Delta rows will be deleted by this worker and their values - * aggregated into the total. - * - * Copyright (C) 2013, PostgreSQL Global Development Group - * - * IDENTIFICATION - * contrib/worker_spi/worker_spi.c - * - * ------------------------------------------------------------------------- - */ -#include "postgres.h" - -/* These are always necessary for a bgworker */ -#include "miscadmin.h" -#include "postmaster/bgworker.h" -#include "storage/ipc.h" -#include "storage/latch.h" -#include "storage/lwlock.h" -#include "storage/proc.h" -#include "storage/shmem.h" - -/* these headers are used by this particular worker's code */ -#include "access/xact.h" -#include "executor/spi.h" -#include "fmgr.h" -#include "lib/stringinfo.h" -#include "pgstat.h" -#include "utils/builtins.h" -#include "utils/snapmgr.h" -#include "tcop/utility.h" - -PG_MODULE_MAGIC; - -PG_FUNCTION_INFO_V1(worker_spi_launch); - -void _PG_init(void); -void worker_spi_main(Datum) __attribute__((noreturn)); - -/* flags set by signal handlers */ -static volatile sig_atomic_t got_sighup = false; -static volatile sig_atomic_t got_sigterm = false; - -/* GUC variables */ -static int worker_spi_naptime = 10; -static int worker_spi_total_workers = 2; - - -typedef struct worktable -{ - const char *schema; - const char *name; -} worktable; - -/* - * Signal handler for SIGTERM - * Set a flag to let the main loop to terminate, and set our latch to wake - * it up. - */ -static void -worker_spi_sigterm(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_sigterm = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - - errno = save_errno; -} - -/* - * Signal handler for SIGHUP - * Set a flag to tell the main loop to reread the config file, and set - * our latch to wake it up. - */ -static void -worker_spi_sighup(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_sighup = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - - errno = save_errno; -} - -/* - * Initialize workspace for a worker process: create the schema if it doesn't - * already exist. - */ -static void -initialize_worker_spi(worktable *table) -{ - int ret; - int ntup; - bool isnull; - StringInfoData buf; - - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema"); - - /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */ - initStringInfo(&buf); - appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'", - table->schema); - - ret = SPI_execute(buf.data, true, 0); - if (ret != SPI_OK_SELECT) - elog(FATAL, "SPI_execute failed: error code %d", ret); - - if (SPI_processed != 1) - elog(FATAL, "not a singleton result"); - - ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, &isnull)); - if (isnull) - elog(FATAL, "null result"); - - if (ntup == 0) - { - resetStringInfo(&buf); - appendStringInfo(&buf, - "CREATE SCHEMA \"%s\" " - "CREATE TABLE \"%s\" (" - " type text CHECK (type IN ('total', 'delta')), " - " value integer)" - "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " - "WHERE type = 'total'", - table->schema, table->name, table->name, table->name); - - /* set statement start time */ - SetCurrentStatementStartTimestamp(); - - ret = SPI_execute(buf.data, false, 0); - - if (ret != SPI_OK_UTILITY) - elog(FATAL, "failed to create my schema"); - } - - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); -} - -void -worker_spi_main(Datum main_arg) -{ - int index = DatumGetInt32(main_arg); - worktable *table; - StringInfoData buf; - char name[20]; - - table = palloc(sizeof(worktable)); - sprintf(name, "schema%d", index); - table->schema = pstrdup(name); - table->name = pstrdup("counted"); - - /* Establish signal handlers before unblocking signals. */ - pqsignal(SIGHUP, worker_spi_sighup); - pqsignal(SIGTERM, worker_spi_sigterm); - - /* We're now ready to receive signals */ - BackgroundWorkerUnblockSignals(); - - /* Connect to our database */ - BackgroundWorkerInitializeConnection("postgres", NULL); - - elog(LOG, "%s initialized with %s.%s", - MyBgworkerEntry->bgw_name, table->schema, table->name); - initialize_worker_spi(table); - - /* - * Quote identifiers passed to us. Note that this must be done after - * initialize_worker_spi, because that routine assumes the names are not - * quoted. - * - * Note some memory might be leaked here. - */ - table->schema = quote_identifier(table->schema); - table->name = quote_identifier(table->name); - - initStringInfo(&buf); - appendStringInfo(&buf, - "WITH deleted AS (DELETE " - "FROM %s.%s " - "WHERE type = 'delta' RETURNING value), " - "total AS (SELECT coalesce(sum(value), 0) as sum " - "FROM deleted) " - "UPDATE %s.%s " - "SET value = %s.value + total.sum " - "FROM total WHERE type = 'total' " - "RETURNING %s.value", - table->schema, table->name, - table->schema, table->name, - table->name, - table->name); - - /* - * Main loop: do this until the SIGTERM handler tells us to terminate - */ - while (!got_sigterm) - { - int ret; - int rc; - - /* - * Background workers mustn't call usleep() or any direct equivalent: - * instead, they may wait on their process latch, which sleeps as - * necessary, but is awakened if postmaster dies. That way the - * background process goes away immediately in an emergency. - */ - rc = WaitLatch(&MyProc->procLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - worker_spi_naptime * 1000L); - ResetLatch(&MyProc->procLatch); - - /* emergency bailout if postmaster has died */ - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - - /* - * In case of a SIGHUP, just reload the configuration. - */ - if (got_sighup) - { - got_sighup = false; - ProcessConfigFile(PGC_SIGHUP); - } - - /* - * Start a transaction on which we can run queries. Note that each - * StartTransactionCommand() call should be preceded by a - * SetCurrentStatementStartTimestamp() call, which sets both the time - * for the statement we're about the run, and also the transaction - * start time. Also, each other query sent to SPI should probably be - * preceded by SetCurrentStatementStartTimestamp(), so that statement - * start time is always up to date. - * - * The SPI_connect() call lets us run queries through the SPI manager, - * and the PushActiveSnapshot() call creates an "active" snapshot - * which is necessary for queries to have MVCC data to work on. - * - * The pgstat_report_activity() call makes our activity visible - * through the pgstat views. - */ - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, buf.data); - - /* We can now execute queries via SPI */ - ret = SPI_execute(buf.data, false, 0); - - if (ret != SPI_OK_UPDATE_RETURNING) - elog(FATAL, "cannot select from table %s.%s: error code %d", - table->schema, table->name, ret); - - if (SPI_processed > 0) - { - bool isnull; - int32 val; - - val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, &isnull)); - if (!isnull) - elog(LOG, "%s: count in %s.%s is now %d", - MyBgworkerEntry->bgw_name, - table->schema, table->name, val); - } - - /* - * And finish our transaction. - */ - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); - } - - proc_exit(1); -} - -/* - * Entrypoint of this module. - * - * We register more than one worker process here, to demonstrate how that can - * be done. - */ -void -_PG_init(void) -{ - BackgroundWorker worker; - unsigned int i; - - /* get the configuration */ - DefineCustomIntVariable("worker_spi.naptime", - "Duration between each check (in seconds).", - NULL, - &worker_spi_naptime, - 10, - 1, - INT_MAX, - PGC_SIGHUP, - 0, - NULL, - NULL, - NULL); - - if (!process_shared_preload_libraries_in_progress) - return; - - DefineCustomIntVariable("worker_spi.total_workers", - "Number of workers.", - NULL, - &worker_spi_total_workers, - 2, - 1, - 100, - PGC_POSTMASTER, - 0, - NULL, - NULL, - NULL); - - /* set up common data for all our workers */ - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | - BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - worker.bgw_restart_time = BGW_NEVER_RESTART; - worker.bgw_main = worker_spi_main; - worker.bgw_notify_pid = 0; - - /* - * Now fill in worker-specific data, and do the actual registrations. - */ - for (i = 1; i <= worker_spi_total_workers; i++) - { - snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); - worker.bgw_main_arg = Int32GetDatum(i); - - RegisterBackgroundWorker(&worker); - } -} - -/* - * Dynamically launch an SPI worker. - */ -Datum -worker_spi_launch(PG_FUNCTION_ARGS) -{ - int32 i = PG_GETARG_INT32(0); - BackgroundWorker worker; - BackgroundWorkerHandle *handle; - BgwHandleStatus status; - pid_t pid; - - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | - BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - worker.bgw_restart_time = BGW_NEVER_RESTART; - worker.bgw_main = NULL; /* new worker might not have library loaded */ - sprintf(worker.bgw_library_name, "worker_spi"); - sprintf(worker.bgw_function_name, "worker_spi_main"); - snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); - worker.bgw_main_arg = Int32GetDatum(i); - /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */ - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - PG_RETURN_NULL(); - - status = WaitForBackgroundWorkerStartup(handle, &pid); - - if (status == BGWH_STOPPED) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not start background process"), - errhint("More details may be available in the server log."))); - if (status == BGWH_POSTMASTER_DIED) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("cannot start background processes without postmaster"), - errhint("Kill all remaining database processes and restart the database."))); - Assert(status == BGWH_STARTED); - - PG_RETURN_INT32(pid); -} diff --git a/contrib/worker_spi/worker_spi.control b/contrib/worker_spi/worker_spi.control deleted file mode 100644 index 84d6294628a..00000000000 --- a/contrib/worker_spi/worker_spi.control +++ /dev/null @@ -1,5 +0,0 @@ -# worker_spi extension -comment = 'Sample background worker' -default_version = '1.0' -module_pathname = '$libdir/worker_spi' -relocatable = true |
