diff options
| author | Tom Lane <tgl@sss.pgh.pa.us> | 2010-02-16 22:34:57 +0000 | 
|---|---|---|
| committer | Tom Lane <tgl@sss.pgh.pa.us> | 2010-02-16 22:34:57 +0000 | 
| commit | d1e027221d0243b7b57eabb0e482923dd7d1c8eb (patch) | |
| tree | 034988b788248c88fad3b73fb4d8d1afff2dd509 /src | |
| parent | fc5173ad514a216dc93bc190dbba3751024a257d (diff) | |
Replace the pg_listener-based LISTEN/NOTIFY mechanism with an in-memory queue.
In addition, add support for a "payload" string to be passed along with
each notify event.
This implementation should be significantly more efficient than the old one,
and is also more compatible with Hot Standby usage.  There is not yet any
facility for HS slaves to receive notifications generated on the master,
although such a thing is possible in future.
Joachim Wieland, reviewed by Jeff Davis; also hacked on by me.
Diffstat (limited to 'src')
29 files changed, 1644 insertions, 578 deletions
| diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index d5f999f494b..6226acc9283 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -41,7 +41,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.48 2010/01/02 16:57:35 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.49 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -59,25 +59,6 @@  #include "miscadmin.h" -/* - * Define segment size.  A page is the same BLCKSZ as is used everywhere - * else in Postgres.  The segment size can be chosen somewhat arbitrarily; - * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG - * or 64K transactions for SUBTRANS. - * - * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, - * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where - * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at - * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need - * take no explicit notice of that fact in this module, except when comparing - * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). - * - * Note: this file currently assumes that segment file names will be four - * hex digits.	This sets a lower bound on the segment size (64K transactions - * for 32-bit TransactionIds). - */ -#define SLRU_PAGES_PER_SEGMENT	32 -  #define SlruFileName(ctl, path, seg) \  	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg) @@ -183,6 +164,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,  	shared = (SlruShared) ShmemInitStruct(name,  										  SimpleLruShmemSize(nslots, nlsns),  										  &found); +	if (!shared) +		elog(ERROR, "out of shared memory");  	if (!IsUnderPostmaster)  	{ diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c index 0ea8e42ca56..86a1d12f93d 100644 --- a/src/backend/access/transam/twophase_rmgr.c +++ b/src/backend/access/transam/twophase_rmgr.c @@ -8,7 +8,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.12 2010/01/02 16:57:35 momjian Exp $ + *	  $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.13 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -16,7 +16,6 @@  #include "access/multixact.h"  #include "access/twophase_rmgr.h" -#include "commands/async.h"  #include "pgstat.h"  #include "storage/lock.h" @@ -25,7 +24,6 @@ const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =  {  	NULL,						/* END ID */  	lock_twophase_recover,		/* Lock */ -	NULL,						/* notify/listen */  	NULL,						/* pgstat */  	multixact_twophase_recover	/* MultiXact */  }; @@ -34,7 +32,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =  {  	NULL,						/* END ID */  	lock_twophase_postcommit,	/* Lock */ -	notify_twophase_postcommit, /* notify/listen */  	pgstat_twophase_postcommit,	/* pgstat */  	multixact_twophase_postcommit /* MultiXact */  }; @@ -43,7 +40,6 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =  {  	NULL,						/* END ID */  	lock_twophase_postabort,	/* Lock */ -	NULL,						/* notify/listen */  	pgstat_twophase_postabort,	/* pgstat */  	multixact_twophase_postabort /* MultiXact */  }; @@ -52,7 +48,6 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1  {  	NULL,						/* END ID */  	lock_twophase_standby_recover,		/* Lock */ -	NULL,						/* notify/listen */  	NULL,						/* pgstat */  	NULL						/* MultiXact */  }; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c239c098fd6..46a842bb9a8 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -10,7 +10,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.285 2010/02/13 16:15:46 sriggs Exp $ + *	  $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.286 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -1736,8 +1736,12 @@ CommitTransaction(void)  	/* close large objects before lower-level cleanup */  	AtEOXact_LargeObject(true); -	/* NOTIFY commit must come before lower-level cleanup */ -	AtCommit_Notify(); +	/* +	 * Insert notifications sent by NOTIFY commands into the queue.  This +	 * should be late in the pre-commit sequence to minimize time spent +	 * holding the notify-insertion lock. +	 */ +	PreCommit_Notify();  	/* Prevent cancel/die interrupt while cleaning up */  	HOLD_INTERRUPTS(); @@ -1825,6 +1829,7 @@ CommitTransaction(void)  	/* Check we've released all catcache entries */  	AtEOXact_CatCache(true); +	AtCommit_Notify();  	AtEOXact_GUC(true, 1);  	AtEOXact_SPI(true);  	AtEOXact_on_commit_actions(true); diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 837865efbe5..a73971d1627 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@  #  # Makefile for backend/catalog  # -# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.76 2010/01/06 19:56:29 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.77 2010/02/16 22:34:43 tgl Exp $  #  #------------------------------------------------------------------------- @@ -30,7 +30,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \  	pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ -	pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \ +	pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \  	pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index afc5e5fabc1..f5863480f5e 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,109 +7,280 @@   * Portions Copyright (c) 1994, Regents of the University of California   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.150 2010/01/02 16:57:36 momjian Exp $ + *	  $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.151 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */  /*------------------------------------------------------------------------- - * New Async Notification Model: - * 1. Multiple backends on same machine.  Multiple backends listening on - *	  one relation.  (Note: "listening on a relation" is not really the - *	  right way to think about it, since the notify names need not have - *	  anything to do with the names of relations actually in the database. - *	  But this terminology is all over the code and docs, and I don't feel - *	  like trying to replace it.) - * - * 2. There is a tuple in relation "pg_listener" for each active LISTEN, - *	  ie, each relname/listenerPID pair.  The "notification" field of the - *	  tuple is zero when no NOTIFY is pending for that listener, or the PID - *	  of the originating backend when a cross-backend NOTIFY is pending. - *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the - *	  notification field should never be equal to the listenerPID field.) - * - * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target - *	  relname to a list of outstanding NOTIFY requests.  Actual processing - *	  happens if and only if we reach transaction commit.  At that time (in - *	  routine AtCommit_Notify) we scan pg_listener for matching relnames. - *	  If the listenerPID in a matching tuple is ours, we just send a notify - *	  message to our own front end.  If it is not ours, and "notification" - *	  is not already nonzero, we set notification to our own PID and send a - *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by - *	  listenerPID). - *	  BTW: if the signal operation fails, we presume that the listener backend - *	  crashed without removing this tuple, and remove the tuple for it. - * - * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler + * Async Notification Model as of 9.0: + * + * 1. Multiple backends on same machine. Multiple backends listening on + *	  several channels. (Channels are also called "conditions" in other + *	  parts of the code.) + * + * 2. There is one central queue in disk-based storage (directory pg_notify/), + *	  with actively-used pages mapped into shared memory by the slru.c module. + *	  All notification messages are placed in the queue and later read out + *	  by listening backends. + * + *	  There is no central knowledge of which backend listens on which channel; + *	  every backend has its own list of interesting channels. + * + *	  Although there is only one queue, notifications are treated as being + *	  database-local; this is done by including the sender's database OID + *	  in each notification message.  Listening backends ignore messages + *	  that don't match their database OID.  This is important because it + *	  ensures senders and receivers have the same database encoding and won't + *	  misinterpret non-ASCII text in the channel name or payload string. + * + *	  Since notifications are not expected to survive database crashes, + *	  we can simply clean out the pg_notify data at any reboot, and there + *	  is no need for WAL support or fsync'ing. + * + * 3. Every backend that is listening on at least one channel registers by + *	  entering its PID into the array in AsyncQueueControl. It then scans all + *	  incoming notifications in the central queue and first compares the + *	  database OID of the notification with its own database OID and then + *	  compares the notified channel with the list of channels that it listens + *	  to. In case there is a match it delivers the notification event to its + *	  frontend.  Non-matching events are simply skipped. + * + * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + *	  a backend-local list which will not be processed until transaction end. + * + *	  Duplicate notifications from the same transaction are sent out as one + *	  notification only. This is done to save work when for example a trigger + *	  on a 2 million row table fires a notification for each row that has been + *	  changed. If the application needs to receive every single notification + *	  that has been sent, it can easily add some unique string into the extra + *	  payload parameter. + * + *	  When the transaction is ready to commit, PreCommit_Notify() adds the + *	  pending notifications to the head of the queue. The head pointer of the + *	  queue always points to the next free position and a position is just a + *	  page number and the offset in that page. This is done before marking the + *	  transaction as committed in clog. If we run into problems writing the + *	  notifications, we can still call elog(ERROR, ...) and the transaction + *	  will roll back. + * + *	  Once we have put all of the notifications into the queue, we return to + *	  CommitTransaction() which will then do the actual transaction commit. + * + *	  After commit we are called another time (AtCommit_Notify()). Here we + *	  make the actual updates to the effective listen state (listenChannels). + * + *	  Finally, after we are out of the transaction altogether, we check if + *	  we need to signal listening backends.  In SignalBackends() we scan the + *	  list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal + *	  to every listening backend (we don't know which backend is listening on + *	  which channel so we must signal them all). We can exclude backends that + *	  are already up to date, though.  We don't bother with a self-signal + *	  either, but just process the queue directly. + * + * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler   *	  can call inbound-notify processing immediately if this backend is idle   *	  (ie, it is waiting for a frontend command and is not within a transaction   *	  block).  Otherwise the handler may only set a flag, which will cause the   *	  processing to occur just before we next go idle.   * - * 5. Inbound-notify processing consists of scanning pg_listener for tuples - *	  matching our own listenerPID and having nonzero notification fields. - *	  For each such tuple, we send a message to our frontend and clear the - *	  notification field.  BTW: this routine has to start/commit its own - *	  transaction, since by assumption it is only called from outside any - *	  transaction. - * - * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list - * of pending actions.	If we reach transaction commit, the changes are - * applied to pg_listener just before executing any pending NOTIFYs.  This - * method is necessary because to avoid race conditions, we must hold lock - * on pg_listener from when we insert a new listener tuple until we commit. - * To do that and not create undue hazard of deadlock, we don't want to - * touch pg_listener until we are otherwise done with the transaction; - * in particular it'd be uncool to still be taking user-commanded locks - * while holding the pg_listener lock. - * - * Although we grab ExclusiveLock on pg_listener for any operation, - * the lock is never held very long, so it shouldn't cause too much of - * a performance problem.  (Previously we used AccessExclusiveLock, but - * there's no real reason to forbid concurrent reads.) - * - * An application that listens on the same relname it notifies will get + *	  Inbound-notify processing consists of reading all of the notifications + *	  that have arrived since scanning last time. We read every notification + *	  until we reach either a notification from an uncommitted transaction or + *	  the head pointer's position. Then we check if we were the laziest + *	  backend: if our pointer is set to the same position as the global tail + *	  pointer is set, then we move the global tail pointer ahead to where the + *	  second-laziest backend is (in general, we take the MIN of the current + *	  head position and all active backends' new tail pointers). Whenever we + *	  move the global tail pointer we also truncate now-unused pages (i.e., + *	  delete files in pg_notify/ that are no longer used). + * + * An application that listens on the same channel it notifies will get   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,   * by comparing be_pid in the NOTIFY message to the application's own backend's   * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the   * frontend during startup.)  The above design guarantees that notifies from - * other backends will never be missed by ignoring self-notifies.  Note, - * however, that we do *not* guarantee that a separate frontend message will - * be sent for every outside NOTIFY.  Since there is only room for one - * originating PID in pg_listener, outside notifies occurring at about the - * same time may be collapsed into a single message bearing the PID of the - * first outside backend to perform the NOTIFY. + * other backends will never be missed by ignoring self-notifies. + * + * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS) + * can be varied without affecting anything but performance.  The maximum + * amount of notification data that can be queued at one time is determined + * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.   *-------------------------------------------------------------------------   */  #include "postgres.h" +#include <limits.h>  #include <unistd.h>  #include <signal.h> -#include "access/heapam.h" -#include "access/twophase_rmgr.h" +#include "access/slru.h" +#include "access/transam.h"  #include "access/xact.h" -#include "catalog/pg_listener.h" +#include "catalog/pg_database.h"  #include "commands/async.h" +#include "funcapi.h"  #include "libpq/libpq.h"  #include "libpq/pqformat.h"  #include "miscadmin.h"  #include "storage/ipc.h" +#include "storage/lmgr.h"  #include "storage/procsignal.h"  #include "storage/sinval.h"  #include "tcop/tcopprot.h"  #include "utils/builtins.h" -#include "utils/fmgroids.h"  #include "utils/memutils.h"  #include "utils/ps_status.h" -#include "utils/tqual.h"  /* + * Maximum size of a NOTIFY payload, including terminating NULL.  This + * must be kept small enough so that a notification message fits on one + * SLRU page. + */ +#define NOTIFY_PAYLOAD_MAX_LENGTH	8000 + +/* + * Struct representing an entry in the global notify queue + * + * This struct declaration has the maximal length, but in a real queue entry + * the data area is only big enough for the actual channel and payload strings + * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible + * entry size, if both channel and payload strings are empty (but note it + * doesn't include alignment padding). + * + * The "length" field should always be rounded up to the next QUEUEALIGN + * multiple so that all fields are properly aligned. + */ +typedef struct AsyncQueueEntry +{ +	int				length;		/* total allocated length of entry */ +	Oid				dboid;		/* sender's database OID */ +	TransactionId	xid;		/* sender's XID */ +	int32			srcPid;		/* sender's PID */ +	char			data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; +} AsyncQueueEntry; + +/* Currently, no field of AsyncQueueEntry requires more than int alignment */ +#define QUEUEALIGN(len)		INTALIGN(len) + +#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2) + +/* + * Struct describing a queue position, and assorted macros for working with it + */ +typedef struct QueuePosition +{ +	int				page;		/* SLRU page number */ +	int				offset;		/* byte offset within page */ +} QueuePosition; + +#define QUEUE_POS_PAGE(x)		((x).page) +#define QUEUE_POS_OFFSET(x)		((x).offset) + +#define SET_QUEUE_POS(x,y,z) \ +	do { \ +		(x).page = (y); \ +		(x).offset = (z); \ +	} while (0) + +#define QUEUE_POS_EQUAL(x,y) \ +	 ((x).page == (y).page && (x).offset == (y).offset) + +/* choose logically smaller QueuePosition */ +#define QUEUE_POS_MIN(x,y) \ +	(asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \ +	 (x).page != (y).page ? (y) : \ +	 (x).offset < (y).offset ? (x) : (y)) + +/* + * Struct describing a listening backend's status + */ +typedef struct QueueBackendStatus +{ +	int32			pid;		/* either a PID or InvalidPid */ +	QueuePosition	pos;		/* backend has read queue up to here */ +} QueueBackendStatus; + +#define	InvalidPid				(-1) + +/* + * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) + * + * The AsyncQueueControl structure is protected by the AsyncQueueLock. + * + * When holding the lock in SHARED mode, backends may only inspect their own + * entries as well as the head and tail pointers. Consequently we can allow a + * backend to update its own record while holding only SHARED lock (since no + * other backend will inspect it). + * + * When holding the lock in EXCLUSIVE mode, backends can inspect the entries + * of other backends and also change the head and tail pointers. + * + * In order to avoid deadlocks, whenever we need both locks, we always first + * get AsyncQueueLock and then AsyncCtlLock. + * + * Each backend uses the backend[] array entry with index equal to its + * BackendId (which can range from 1 to MaxBackends).  We rely on this to make + * SendProcSignal fast. + */ +typedef struct AsyncQueueControl +{ +	QueuePosition		head;		/* head points to the next free location */ +	QueuePosition 		tail;		/* the global tail is equivalent to the +									   tail of the "slowest" backend */ +	TimestampTz			lastQueueFillWarn;	/* time of last queue-full msg */ +	QueueBackendStatus	backend[1];	/* actually of length MaxBackends+1 */ +	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */ +} AsyncQueueControl; + +static AsyncQueueControl   *asyncQueueControl; + +#define QUEUE_HEAD					(asyncQueueControl->head) +#define QUEUE_TAIL					(asyncQueueControl->tail) +#define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid) +#define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos) + +/* + * The SLRU buffer area through which we access the notification queue + */ +static SlruCtlData			AsyncCtlData; + +#define AsyncCtl					(&AsyncCtlData) +#define QUEUE_PAGESIZE				BLCKSZ +#define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */ + +/* + * slru.c currently assumes that all filenames are four characters of hex + * digits. That means that we can use segments 0000 through FFFF. + * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us + * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. + * + * It's of course possible to enhance slru.c, but this gives us so much + * space already that it doesn't seem worth the trouble. + * + * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 + * pages, because more than that would confuse slru.c into thinking there + * was a wraparound condition.  With the default BLCKSZ this means there + * can be up to 8GB of queued-and-not-read data. + * + * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of + * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. + */ +#define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) + +/* + * listenChannels identifies the channels we are actually listening to + * (ie, have committed a LISTEN on).  It is a simple list of channel names, + * allocated in TopMemoryContext. + */ +static List *listenChannels = NIL;		/* list of C strings */ + +/*   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of   * all actions requested in the current transaction.  As explained above, - * we don't actually modify pg_listener until we reach transaction commit. + * we don't actually change listenChannels until we reach transaction commit.   *   * The list is kept in CurTransactionContext.  In subtransactions, each   * subtransaction has its own list in its own CurTransactionContext, but @@ -126,7 +297,7 @@ typedef enum  typedef struct  {  	ListenActionKind action; -	char		condname[1];	/* actually, as long as needed */ +	char		channel[1];		/* actually, as long as needed */  } ListenAction;  static List *pendingActions = NIL;		/* list of ListenAction */ @@ -134,9 +305,9 @@ static List *pendingActions = NIL;		/* list of ListenAction */  static List *upperPendingActions = NIL; /* list of upper-xact lists */  /* - * State for outbound notifies consists of a list of all relnames NOTIFYed - * in the current transaction.	We do not actually perform a NOTIFY until - * and unless the transaction commits.	pendingNotifies is NIL if no + * State for outbound notifies consists of a list of all channels+payloads + * NOTIFYed in the current transaction.	We do not actually perform a NOTIFY + * until and unless the transaction commits.  pendingNotifies is NIL if no   * NOTIFYs have been done in the current transaction.   *   * The list is kept in CurTransactionContext.  In subtransactions, each @@ -149,12 +320,18 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */   * condition name, it will get a self-notify at commit.  This is a bit odd   * but is consistent with our historical behavior.   */ -static List *pendingNotifies = NIL;		/* list of C strings */ +typedef struct Notification +{ +	char		   *channel;	/* channel name */ +	char		   *payload;	/* payload string (can be empty) */ +} Notification; + +static List *pendingNotifies = NIL;				/* list of Notifications */  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */  /* - * State for inbound notifies consists of two flags: one saying whether + * State for inbound notifications consists of two flags: one saying whether   * the signal handler is currently allowed to call ProcessIncomingNotify   * directly, and one saying whether the signal has occurred but the handler   * was not allowed to call ProcessIncomingNotify at the time. @@ -168,57 +345,259 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0;  /* True if we've registered an on_shmem_exit cleanup */  static bool unlistenExitRegistered = false; +/* has this backend sent notifications in the current transaction? */ +static bool backendHasSentNotifications = false; +/* has this backend executed its first LISTEN in the current transaction? */ +static bool backendHasExecutedInitialListen = false; +/* GUC parameter */  bool		Trace_notify = false; - -static void queue_listen(ListenActionKind action, const char *condname); +/* local function prototypes */ +static bool asyncQueuePagePrecedesPhysically(int p, int q); +static bool asyncQueuePagePrecedesLogically(int p, int q); +static void queue_listen(ListenActionKind action, const char *channel);  static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_Listen(Relation lRel, const char *relname); -static void Exec_Unlisten(Relation lRel, const char *relname); -static void Exec_UnlistenAll(Relation lRel); -static void Send_Notify(Relation lRel); +static void Exec_ListenPreCommit(void); +static void Exec_ListenCommit(const char *channel); +static void Exec_UnlistenCommit(const char *channel); +static void Exec_UnlistenAllCommit(void); +static bool IsListeningOn(const char *channel); +static void asyncQueueUnregister(void); +static bool asyncQueueIsFull(void); +static bool asyncQueueAdvance(QueuePosition *position, int entryLength); +static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); +static ListCell *asyncQueueAddEntries(ListCell *nextNotify); +static void asyncQueueFillWarning(void); +static bool SignalBackends(void); +static void asyncQueueReadAllNotifications(void); +static bool asyncQueueProcessPageEntries(QueuePosition *current, +							 QueuePosition stop, +							 char *page_buffer); +static void asyncQueueAdvanceTail(void);  static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(char *relname, int32 listenerPID); -static bool AsyncExistsPendingNotify(const char *relname); +static void NotifyMyFrontEnd(const char *channel, +							 const char *payload, +							 int32 srcPid); +static bool AsyncExistsPendingNotify(const char *channel, const char *payload);  static void ClearPendingActionsAndNotifies(void);  /* + * We will work on the page range of 0..QUEUE_MAX_PAGE. + * + * asyncQueuePagePrecedesPhysically just checks numerically without any magic + * if one page precedes another one.  This is wrong for normal operation but + * is helpful when clearing pg_notify/ during startup. + * + * asyncQueuePagePrecedesLogically compares using wraparound logic, as is + * required by slru.c. + */ +static bool +asyncQueuePagePrecedesPhysically(int p, int q) +{ +	return p < q; +} + +static bool +asyncQueuePagePrecedesLogically(int p, int q) +{ +	int			diff; + +	/* +	 * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should +	 * be in the range 0..QUEUE_MAX_PAGE. +	 */ +	Assert(p >= 0 && p <= QUEUE_MAX_PAGE); +	Assert(q >= 0 && q <= QUEUE_MAX_PAGE); + +	diff = p - q; +	if (diff >= ((QUEUE_MAX_PAGE+1)/2)) +		diff -= QUEUE_MAX_PAGE+1; +	else if (diff < -((QUEUE_MAX_PAGE+1)/2)) +		diff += QUEUE_MAX_PAGE+1; +	return diff < 0; +} + +/* + * Report space needed for our shared memory area + */ +Size +AsyncShmemSize(void) +{ +	Size	size; + +	/* This had better match AsyncShmemInit */ +	size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); +	size = add_size(size, sizeof(AsyncQueueControl)); + +	size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)); + +	return size; +} + +/* + * Initialize our shared memory area + */ +void +AsyncShmemInit(void) +{ +	bool	found; +	int		slotno; +	Size	size; + +	/* +	 * Create or attach to the AsyncQueueControl structure. +	 * +	 * The used entries in the backend[] array run from 1 to MaxBackends. +	 * sizeof(AsyncQueueControl) already includes space for the unused zero'th +	 * entry, but we need to add on space for the used entries. +	 */ +	size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); +	size = add_size(size, sizeof(AsyncQueueControl)); + +	asyncQueueControl = (AsyncQueueControl *) +		ShmemInitStruct("Async Queue Control", size, &found); + +	if (!asyncQueueControl) +		elog(ERROR, "out of shared memory"); + +	if (!found) +	{ +		/* First time through, so initialize it */ +		int		i; + +		SET_QUEUE_POS(QUEUE_HEAD, 0, 0); +		SET_QUEUE_POS(QUEUE_TAIL, 0, 0); +		asyncQueueControl->lastQueueFillWarn = 0; +		/* zero'th entry won't be used, but let's initialize it anyway */ +		for (i = 0; i <= MaxBackends; i++) +		{ +			QUEUE_BACKEND_PID(i) = InvalidPid; +			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); +		} +	} + +	/* +	 * Set up SLRU management of the pg_notify data. +	 */ +	AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically; +	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0, +				  AsyncCtlLock, "pg_notify"); +	/* Override default assumption that writes should be fsync'd */ +	AsyncCtl->do_fsync = false; + +	if (!found) +	{ +		/* +		 * During start or reboot, clean out the pg_notify directory. +		 * +		 * Since we want to remove every file, we temporarily use +		 * asyncQueuePagePrecedesPhysically() and pass INT_MAX as the +		 * comparison value; every file in the directory should therefore +		 * appear to be less than that. +		 */ +		AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically; +		(void) SlruScanDirectory(AsyncCtl, INT_MAX, true); +		AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically; + +		/* Now initialize page zero to empty */ +		LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); +		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); +		/* This write is just to verify that pg_notify/ is writable */ +		SimpleLruWritePage(AsyncCtl, slotno, NULL); +		LWLockRelease(AsyncCtlLock); +	} +} + + +/* + * pg_notify - + *	  SQL function to send a notification event + */ +Datum +pg_notify(PG_FUNCTION_ARGS) +{ +	const char *channel; +	const char *payload; + +	if (PG_ARGISNULL(0)) +		channel = ""; +	else +		channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); + +	if (PG_ARGISNULL(1)) +		payload = ""; +	else +		payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); + +	Async_Notify(channel, payload); + +	PG_RETURN_VOID(); +} + + +/*   * Async_Notify   *   *		This is executed by the SQL notify command.   * - *		Adds the relation to the list of pending notifies. + *		Adds the message to the list of pending notifies.   *		Actual notification happens during transaction commit.   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   */  void -Async_Notify(const char *relname) +Async_Notify(const char *channel, const char *payload)  { +	Notification *n; +	MemoryContext oldcontext; +  	if (Trace_notify) -		elog(DEBUG1, "Async_Notify(%s)", relname); +		elog(DEBUG1, "Async_Notify(%s)", channel); -	/* no point in making duplicate entries in the list ... */ -	if (!AsyncExistsPendingNotify(relname)) +	/* a channel name must be specified */ +	if (!channel || !strlen(channel)) +		ereport(ERROR, +				(errcode(ERRCODE_INVALID_PARAMETER_VALUE), +				 errmsg("channel name cannot be empty"))); + +	if (strlen(channel) >= NAMEDATALEN) +		ereport(ERROR, +				(errcode(ERRCODE_INVALID_PARAMETER_VALUE), +				 errmsg("channel name too long"))); + +	if (payload)  	{ -		/* -		 * The name list needs to live until end of transaction, so store it -		 * in the transaction context. -		 */ -		MemoryContext oldcontext; +		if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH) +			ereport(ERROR, +					(errcode(ERRCODE_INVALID_PARAMETER_VALUE), +					 errmsg("payload string too long"))); +	} -		oldcontext = MemoryContextSwitchTo(CurTransactionContext); +	/* no point in making duplicate entries in the list ... */ +	if (AsyncExistsPendingNotify(channel, payload)) +		return; -		/* -		 * Ordering of the list isn't important.  We choose to put new entries -		 * on the front, as this might make duplicate-elimination a tad faster -		 * when the same condition is signaled many times in a row. -		 */ -		pendingNotifies = lcons(pstrdup(relname), pendingNotifies); +	/* +	 * The notification list needs to live until end of transaction, so store +	 * it in the transaction context. +	 */ +	oldcontext = MemoryContextSwitchTo(CurTransactionContext); -		MemoryContextSwitchTo(oldcontext); -	} +	n = (Notification *) palloc(sizeof(Notification)); +	n->channel = pstrdup(channel); +	if (payload) +		n->payload = pstrdup(payload); +	else +		n->payload = ""; + +	/* +	 * We want to preserve the order so we need to append every +	 * notification. See comments at AsyncExistsPendingNotify(). +	 */ +	pendingNotifies = lappend(pendingNotifies, n); + +	MemoryContextSwitchTo(oldcontext);  }  /* @@ -226,11 +605,11 @@ Async_Notify(const char *relname)   *		Common code for listen, unlisten, unlisten all commands.   *   *		Adds the request to the list of pending actions. - *		Actual update of pg_listener happens during transaction commit. - *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + *		Actual update of the listenChannels list happens during transaction + *		commit.   */  static void -queue_listen(ListenActionKind action, const char *condname) +queue_listen(ListenActionKind action, const char *channel)  {  	MemoryContext oldcontext;  	ListenAction *actrec; @@ -244,9 +623,9 @@ queue_listen(ListenActionKind action, const char *condname)  	oldcontext = MemoryContextSwitchTo(CurTransactionContext);  	/* space for terminating null is included in sizeof(ListenAction) */ -	actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); +	actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));  	actrec->action = action; -	strcpy(actrec->condname, condname); +	strcpy(actrec->channel, channel);  	pendingActions = lappend(pendingActions, actrec); @@ -259,12 +638,12 @@ queue_listen(ListenActionKind action, const char *condname)   *		This is executed by the SQL listen command.   */  void -Async_Listen(const char *relname) +Async_Listen(const char *channel)  {  	if (Trace_notify) -		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); +		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); -	queue_listen(LISTEN_LISTEN, relname); +	queue_listen(LISTEN_LISTEN, channel);  }  /* @@ -273,16 +652,16 @@ Async_Listen(const char *relname)   *		This is executed by the SQL unlisten command.   */  void -Async_Unlisten(const char *relname) +Async_Unlisten(const char *channel)  {  	if (Trace_notify) -		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); +		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);  	/* If we couldn't possibly be listening, no need to queue anything */  	if (pendingActions == NIL && !unlistenExitRegistered)  		return; -	queue_listen(LISTEN_UNLISTEN, relname); +	queue_listen(LISTEN_UNLISTEN, channel);  }  /* @@ -304,28 +683,63 @@ Async_UnlistenAll(void)  }  /* - * Async_UnlistenOnExit + * SQL function: return a set of the channel names this backend is actively + * listening to.   * - *		Clean up the pg_listener table at backend exit. + * Note: this coding relies on the fact that the listenChannels list cannot + * change within a transaction. + */ +Datum +pg_listening_channels(PG_FUNCTION_ARGS) +{ +	FuncCallContext	   *funcctx; +	ListCell		  **lcp; + +	/* stuff done only on the first call of the function */ +	if (SRF_IS_FIRSTCALL()) +	{ +		MemoryContext	oldcontext; + +		/* create a function context for cross-call persistence */ +		funcctx = SRF_FIRSTCALL_INIT(); + +		/* switch to memory context appropriate for multiple function calls */ +		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + +		/* allocate memory for user context */ +		lcp = (ListCell **) palloc(sizeof(ListCell *)); +		*lcp = list_head(listenChannels); +		funcctx->user_fctx = (void *) lcp; + +		MemoryContextSwitchTo(oldcontext); +	} + +	/* stuff done on every call of the function */ +	funcctx = SRF_PERCALL_SETUP(); +	lcp = (ListCell **) funcctx->user_fctx; + +	while (*lcp != NULL) +	{ +		char   *channel = (char *) lfirst(*lcp); + +		*lcp = lnext(*lcp); +		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); +	} + +	SRF_RETURN_DONE(funcctx); +} + +/* + * Async_UnlistenOnExit   * - *		This is executed if we have done any LISTENs in this backend. - *		It might not be necessary anymore, if the user UNLISTENed everything, - *		but we don't try to detect that case. + * This is executed at backend exit if we have done any LISTENs in this + * backend.  It might not be necessary anymore, if the user UNLISTENed + * everything, but we don't try to detect that case.   */  static void  Async_UnlistenOnExit(int code, Datum arg)  { -	/* -	 * We need to start/commit a transaction for the unlisten, but if there is -	 * already an active transaction we had better abort that one first. -	 * Otherwise we'd end up committing changes that probably ought to be -	 * discarded. -	 */ -	AbortOutOfAnyTransaction(); -	/* Now we can do the unlisten */ -	StartTransactionCommand(); -	Async_UnlistenAll(); -	CommitTransactionCommand(); +	Exec_UnlistenAllCommit();  }  /* @@ -337,72 +751,42 @@ Async_UnlistenOnExit(int code, Datum arg)  void  AtPrepare_Notify(void)  { -	ListCell   *p; - -	/* It's not sensible to have any pending LISTEN/UNLISTEN actions */ -	if (pendingActions) +	/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ +	if (pendingActions || pendingNotifies)  		ereport(ERROR,  				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), -				 errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); - -	/* We can deal with pending NOTIFY though */ -	foreach(p, pendingNotifies) -	{ -		const char *relname = (const char *) lfirst(p); - -		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, -							   relname, strlen(relname) + 1); -	} - -	/* -	 * We can clear the state immediately, rather than needing a separate -	 * PostPrepare call, because if the transaction fails we'd just discard -	 * the state anyway. -	 */ -	ClearPendingActionsAndNotifies(); +				 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN or NOTIFY")));  }  /* - * AtCommit_Notify + * PreCommit_Notify   * - *		This is called at transaction commit. + *		This is called at transaction commit, before actually committing to + *		clog.   * - *		If there are pending LISTEN/UNLISTEN actions, insert or delete - *		tuples in pg_listener accordingly. + *		If there are pending LISTEN actions, make sure we are listed in the + *		shared-memory listener array.  This must happen before commit to + *		ensure we don't miss any notifies from transactions that commit + *		just after ours.   *   *		If there are outbound notify requests in the pendingNotifies list, - *		scan pg_listener for matching tuples, and either signal the other - *		backend or send a message to our own frontend. - * - *		NOTE: we are still inside the current transaction, therefore can - *		piggyback on its committing of changes. + *		add them to the global queue.  We do that before commit so that + *		we can still throw error if we run out of queue space.   */  void -AtCommit_Notify(void) +PreCommit_Notify(void)  { -	Relation	lRel;  	ListCell   *p;  	if (pendingActions == NIL && pendingNotifies == NIL)  		return;					/* no relevant statements in this xact */ -	/* -	 * NOTIFY is disabled if not normal processing mode. This test used to be -	 * in xact.c, but it seems cleaner to do it here. -	 */ -	if (!IsNormalProcessingMode()) -	{ -		ClearPendingActionsAndNotifies(); -		return; -	} -  	if (Trace_notify) -		elog(DEBUG1, "AtCommit_Notify"); +		elog(DEBUG1, "PreCommit_Notify"); -	/* Acquire ExclusiveLock on pg_listener */ -	lRel = heap_open(ListenerRelationId, ExclusiveLock); +	Assert(backendHasExecutedInitialListen == false); -	/* Perform any pending listen/unlisten actions */ +	/* Preflight for any pending listen/unlisten actions */  	foreach(p, pendingActions)  	{  		ListenAction *actrec = (ListenAction *) lfirst(p); @@ -410,281 +794,743 @@ AtCommit_Notify(void)  		switch (actrec->action)  		{  			case LISTEN_LISTEN: -				Exec_Listen(lRel, actrec->condname); +				Exec_ListenPreCommit();  				break;  			case LISTEN_UNLISTEN: -				Exec_Unlisten(lRel, actrec->condname); +				/* there is no Exec_UnlistenPreCommit() */  				break;  			case LISTEN_UNLISTEN_ALL: -				Exec_UnlistenAll(lRel); +				/* there is no Exec_UnlistenAllPreCommit() */  				break;  		} - -		/* We must CCI after each action in case of conflicting actions */ -		CommandCounterIncrement();  	} -	/* Perform any pending notifies */ +	/* Queue any pending notifies */  	if (pendingNotifies) -		Send_Notify(lRel); +	{ +		ListCell   *nextNotify; -	/* -	 * We do NOT release the lock on pg_listener here; we need to hold it -	 * until end of transaction (which is about to happen, anyway) to ensure -	 * that notified backends see our tuple updates when they look. Else they -	 * might disregard the signal, which would make the application programmer -	 * very unhappy.  Also, this prevents race conditions when we have just -	 * inserted a listening tuple. -	 */ -	heap_close(lRel, NoLock); +		/* +		 * Make sure that we have an XID assigned to the current transaction. +		 * GetCurrentTransactionId is cheap if we already have an XID, but +		 * not so cheap if we don't, and we'd prefer not to do that work +		 * while holding AsyncQueueLock. +		 */ +		(void) GetCurrentTransactionId(); -	ClearPendingActionsAndNotifies(); +		/* +		 * Serialize writers by acquiring a special lock that we hold till +		 * after commit.  This ensures that queue entries appear in commit +		 * order, and in particular that there are never uncommitted queue +		 * entries ahead of committed ones, so an uncommitted transaction +		 * can't block delivery of deliverable notifications. +		 * +		 * We use a heavyweight lock so that it'll automatically be released +		 * after either commit or abort.  This also allows deadlocks to be +		 * detected, though really a deadlock shouldn't be possible here. +		 * +		 * The lock is on "database 0", which is pretty ugly but it doesn't +		 * seem worth inventing a special locktag category just for this. +		 * (Historical note: before PG 9.0, a similar lock on "database 0" was +		 * used by the flatfiles mechanism.) +		 */ +		LockSharedObject(DatabaseRelationId, InvalidOid, 0, +						 AccessExclusiveLock); -	if (Trace_notify) -		elog(DEBUG1, "AtCommit_Notify: done"); +		/* Now push the notifications into the queue */ +		backendHasSentNotifications = true; + +		nextNotify = list_head(pendingNotifies); +		while (nextNotify != NULL) +		{ +			/* +			 * Add the pending notifications to the queue.  We acquire and +			 * release AsyncQueueLock once per page, which might be overkill +			 * but it does allow readers to get in while we're doing this. +			 * +			 * A full queue is very uncommon and should really not happen, +			 * given that we have so much space available in the SLRU pages. +			 * Nevertheless we need to deal with this possibility. Note that +			 * when we get here we are in the process of committing our +			 * transaction, but we have not yet committed to clog, so at this +			 * point in time we can still roll the transaction back. +			 */ +			LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); +			asyncQueueFillWarning(); +			if (asyncQueueIsFull()) +				ereport(ERROR, +						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), +						 errmsg("too many notifications in the NOTIFY queue"))); +			nextNotify = asyncQueueAddEntries(nextNotify); +			LWLockRelease(AsyncQueueLock); +		} +	}  }  /* - * Exec_Listen --- subroutine for AtCommit_Notify + * AtCommit_Notify + * + *		This is called at transaction commit, after committing to clog.   * - *		Register the current backend as listening on the specified relation. + *		Update listenChannels and clear transaction-local state.   */ -static void -Exec_Listen(Relation lRel, const char *relname) +void +AtCommit_Notify(void)  { -	HeapScanDesc scan; -	HeapTuple	tuple; -	Datum		values[Natts_pg_listener]; -	bool		nulls[Natts_pg_listener]; -	NameData	condname; -	bool		alreadyListener = false; +	ListCell   *p; + +	/* +	 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to +	 * return as soon as possible +	 */ +	if (!pendingActions && !pendingNotifies) +		return;  	if (Trace_notify) -		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); +		elog(DEBUG1, "AtCommit_Notify"); -	/* Detect whether we are already listening on this relname */ -	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); -	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) +	/* Perform any pending listen/unlisten actions */ +	foreach(p, pendingActions)  	{ -		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); +		ListenAction *actrec = (ListenAction *) lfirst(p); -		if (listener->listenerpid == MyProcPid && -			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) +		switch (actrec->action)  		{ -			alreadyListener = true; -			/* No need to scan the rest of the table */ -			break; +			case LISTEN_LISTEN: +				Exec_ListenCommit(actrec->channel); +				break; +			case LISTEN_UNLISTEN: +				Exec_UnlistenCommit(actrec->channel); +				break; +			case LISTEN_UNLISTEN_ALL: +				Exec_UnlistenAllCommit(); +				break;  		}  	} -	heap_endscan(scan); - -	if (alreadyListener) -		return;  	/* -	 * OK to insert a new tuple +	 * If we did an initial LISTEN, listenChannels now has the entry, so +	 * we no longer need or want the flag to be set.  	 */ -	memset(nulls, false, sizeof(nulls)); - -	namestrcpy(&condname, relname); -	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); -	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid); -	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */ +	backendHasExecutedInitialListen = false; -	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls); +	/* And clean up */ +	ClearPendingActionsAndNotifies(); +} -	simple_heap_insert(lRel, tuple); +/* + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * + * This function must make sure we are ready to catch any incoming messages. + */ +static void +Exec_ListenPreCommit(void) +{ +	/* +	 * Nothing to do if we are already listening to something, nor if we +	 * already ran this routine in this transaction. +	 */ +	if (listenChannels != NIL || backendHasExecutedInitialListen) +		return; -#ifdef NOT_USED					/* currently there are no indexes */ -	CatalogUpdateIndexes(lRel, tuple); -#endif +	if (Trace_notify) +		elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); -	heap_freetuple(tuple); +	/* +	 * We need this variable to detect an aborted initial LISTEN. +	 * In that case we would set up our pointer but not listen on any channel. +	 * This flag gets cleared in AtCommit_Notify or AtAbort_Notify(). +	 */ +	backendHasExecutedInitialListen = true;  	/* -	 * now that we are listening, make sure we will unlisten before dying. +	 * Before registering, make sure we will unlisten before dying. +	 * (Note: this action does not get undone if we abort later.)  	 */  	if (!unlistenExitRegistered)  	{  		on_shmem_exit(Async_UnlistenOnExit, 0);  		unlistenExitRegistered = true;  	} + +	/* +	 * This is our first LISTEN, so establish our pointer. +	 * +	 * We set our pointer to the global tail pointer and then move it forward +	 * over already-committed notifications.  This ensures we cannot miss any +	 * not-yet-committed notifications.  We might get a few more but that +	 * doesn't hurt. +	 */ +	LWLockAcquire(AsyncQueueLock, LW_SHARED); +	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL; +	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; +	LWLockRelease(AsyncQueueLock); + +	/* +	 * Try to move our pointer forward as far as possible. This will skip over +	 * already-committed notifications. Still, we could get notifications that +	 * have already committed before we started to LISTEN. +	 * +	 * Note that we are not yet listening on anything, so we won't deliver +	 * any notification to the frontend. +	 * +	 * This will also advance the global tail pointer if possible. +	 */ +	asyncQueueReadAllNotifications();  }  /* - * Exec_Unlisten --- subroutine for AtCommit_Notify + * Exec_ListenCommit --- subroutine for AtCommit_Notify   * - *		Remove the current backend from the list of listening backends - *		for the specified relation. + * Add the channel to the list of channels we are listening on.   */  static void -Exec_Unlisten(Relation lRel, const char *relname) +Exec_ListenCommit(const char *channel)  { -	HeapScanDesc scan; -	HeapTuple	tuple; +	MemoryContext oldcontext; + +	/* Do nothing if we are already listening on this channel */ +	if (IsListeningOn(channel)) +		return; + +	/* +	 * Add the new channel name to listenChannels. +	 * +	 * XXX It is theoretically possible to get an out-of-memory failure here, +	 * which would be bad because we already committed.  For the moment it +	 * doesn't seem worth trying to guard against that, but maybe improve this +	 * later. +	 */ +	oldcontext = MemoryContextSwitchTo(TopMemoryContext); +	listenChannels = lappend(listenChannels, pstrdup(channel)); +	MemoryContextSwitchTo(oldcontext); +} + +/* + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * + * Remove the specified channel name from listenChannels. + */ +static void +Exec_UnlistenCommit(const char *channel) +{ +	ListCell *q; +	ListCell *prev;  	if (Trace_notify) -		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); +		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); -	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); -	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) +	prev = NULL; +	foreach(q, listenChannels)  	{ -		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); +		char *lchan = (char *) lfirst(q); -		if (listener->listenerpid == MyProcPid && -			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) +		if (strcmp(lchan, channel) == 0)  		{ -			/* Found the matching tuple, delete it */ -			simple_heap_delete(lRel, &tuple->t_self); - -			/* -			 * We assume there can be only one match, so no need to scan the -			 * rest of the table -			 */ +			listenChannels = list_delete_cell(listenChannels, q, prev); +			pfree(lchan);  			break;  		} +		prev = q;  	} -	heap_endscan(scan);  	/*  	 * We do not complain about unlistening something not being listened;  	 * should we?  	 */ + +	/* If no longer listening to anything, get out of listener array */ +	if (listenChannels == NIL) +		asyncQueueUnregister();  }  /* - * Exec_UnlistenAll --- subroutine for AtCommit_Notify + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify   * - *		Update pg_listener to unlisten all relations for this backend. + *		Unlisten on all channels for this backend.   */  static void -Exec_UnlistenAll(Relation lRel) +Exec_UnlistenAllCommit(void) +{ +	if (Trace_notify) +		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + +	list_free_deep(listenChannels); +	listenChannels = NIL; + +	asyncQueueUnregister(); +} + +/* + * ProcessCompletedNotifies --- send out signals and self-notifies + * + * This is called from postgres.c just before going idle at the completion + * of a transaction.  If we issued any notifications in the just-completed + * transaction, send signals to other backends to process them, and also + * process the queue ourselves to send messages to our own frontend. + * + * The reason that this is not done in AtCommit_Notify is that there is + * a nonzero chance of errors here (for example, encoding conversion errors + * while trying to format messages to our frontend).  An error during + * AtCommit_Notify would be a PANIC condition.  The timing is also arranged + * to ensure that a transaction's self-notifies are delivered to the frontend + * before it gets the terminating ReadyForQuery message. + * + * Note that we send signals and process the queue even if the transaction + * eventually aborted.  This is because we need to clean out whatever got + * added to the queue. + * + * NOTE: we are outside of any transaction here. + */ +void +ProcessCompletedNotifies(void)  { -	HeapScanDesc scan; -	HeapTuple	lTuple; -	ScanKeyData key[1]; +	bool		signalled; + +	/* Nothing to do if we didn't send any notifications */ +	if (!backendHasSentNotifications) +		return; + +	/* +	 * We reset the flag immediately; otherwise, if any sort of error +	 * occurs below, we'd be locked up in an infinite loop, because +	 * control will come right back here after error cleanup. +	 */ +	backendHasSentNotifications = false;  	if (Trace_notify) -		elog(DEBUG1, "Exec_UnlistenAll"); +		elog(DEBUG1, "ProcessCompletedNotifies"); + +	/* +	 * We must run asyncQueueReadAllNotifications inside a transaction, +	 * else bad things happen if it gets an error. +	 */ +	StartTransactionCommand(); + +	/* Send signals to other backends */ +	signalled = SignalBackends(); -	/* Find and delete all entries with my listenerPID */ -	ScanKeyInit(&key[0], -				Anum_pg_listener_listenerpid, -				BTEqualStrategyNumber, F_INT4EQ, -				Int32GetDatum(MyProcPid)); -	scan = heap_beginscan(lRel, SnapshotNow, 1, key); +	if (listenChannels != NIL) +	{ +		/* Read the queue ourselves, and send relevant stuff to the frontend */ +		asyncQueueReadAllNotifications(); +	} +	else if (!signalled) +	{ +		/* +		 * If we found no other listening backends, and we aren't listening +		 * ourselves, then we must execute asyncQueueAdvanceTail to flush +		 * the queue, because ain't nobody else gonna do it.  This prevents +		 * queue overflow when we're sending useless notifies to nobody. +		 * (A new listener could have joined since we looked, but if so this +		 * is harmless.) +		 */ +		asyncQueueAdvanceTail(); +	} -	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) -		simple_heap_delete(lRel, &lTuple->t_self); +	CommitTransactionCommand(); -	heap_endscan(scan); +	/* We don't need pq_flush() here since postgres.c will do one shortly */  }  /* - * Send_Notify --- subroutine for AtCommit_Notify + * Test whether we are actively listening on the given channel name.   * - *		Scan pg_listener for tuples matching our pending notifies, and - *		either signal the other backend or send a message to our own frontend. + * Note: this function is executed for every notification found in the queue. + * Perhaps it is worth further optimization, eg convert the list to a sorted + * array so we can binary-search it.  In practice the list is likely to be + * fairly short, though. + */ +static bool +IsListeningOn(const char *channel) +{ +	ListCell   *p; + +	foreach(p, listenChannels) +	{ +		char	   *lchan = (char *) lfirst(p); + +		if (strcmp(lchan, channel) == 0) +			return true; +	} +	return false; +} + +/* + * Remove our entry from the listeners array when we are no longer listening + * on any channel.  NB: must not fail if we're already not listening.   */  static void -Send_Notify(Relation lRel) +asyncQueueUnregister(void) +{ +	bool	  advanceTail; + +	Assert(listenChannels == NIL);				/* else caller error */ + +	LWLockAcquire(AsyncQueueLock, LW_SHARED); +	/* check if entry is valid and oldest ... */ +	advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && +		QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); +	/* ... then mark it invalid */ +	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; +	LWLockRelease(AsyncQueueLock); + +	/* If we were the laziest backend, try to advance the tail pointer */ +	if (advanceTail) +		asyncQueueAdvanceTail(); +} + +/* + * Test whether there is room to insert more notification messages. + * + * Caller must hold at least shared AsyncQueueLock. + */ +static bool +asyncQueueIsFull(void) +{ +	int			nexthead; +	int			boundary; + +	/* +	 * The queue is full if creating a new head page would create a page that +	 * logically precedes the current global tail pointer, ie, the head +	 * pointer would wrap around compared to the tail.  We cannot create such +	 * a head page for fear of confusing slru.c.  For safety we round the tail +	 * pointer back to a segment boundary (compare the truncation logic in +	 * asyncQueueAdvanceTail). +	 * +	 * Note that this test is *not* dependent on how much space there is on +	 * the current head page.  This is necessary because asyncQueueAddEntries +	 * might try to create the next head page in any case. +	 */ +	nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; +	if (nexthead > QUEUE_MAX_PAGE) +		nexthead = 0;			/* wrap around */ +	boundary = QUEUE_POS_PAGE(QUEUE_TAIL); +	boundary -= boundary % SLRU_PAGES_PER_SEGMENT; +	return asyncQueuePagePrecedesLogically(nexthead, boundary); +} + +/* + * Advance the QueuePosition to the next entry, assuming that the current + * entry is of length entryLength.  If we jump to a new page the function + * returns true, else false. + */ +static bool +asyncQueueAdvance(QueuePosition *position, int entryLength)  { -	TupleDesc	tdesc = RelationGetDescr(lRel); -	HeapScanDesc scan; -	HeapTuple	lTuple, -				rTuple; -	Datum		value[Natts_pg_listener]; -	bool		repl[Natts_pg_listener], -				nulls[Natts_pg_listener]; - -	/* preset data to update notify column to MyProcPid */ -	memset(nulls, false, sizeof(nulls)); -	memset(repl, false, sizeof(repl)); -	repl[Anum_pg_listener_notification - 1] = true; -	memset(value, 0, sizeof(value)); -	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid); - -	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); - -	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) +	int		pageno = QUEUE_POS_PAGE(*position); +	int		offset = QUEUE_POS_OFFSET(*position); +	bool	pageJump = false; + +	/* +	 * Move to the next writing position: First jump over what we have just +	 * written or read. +	 */ +	offset += entryLength; +	Assert(offset <= QUEUE_PAGESIZE); + +	/* +	 * In a second step check if another entry can possibly be written to the +	 * page. If so, stay here, we have reached the next position. If not, then +	 * we need to move on to the next page. +	 */ +	if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)  	{ -		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); -		char	   *relname = NameStr(listener->relname); -		int32		listenerPID = listener->listenerpid; +		pageno++; +		if (pageno > QUEUE_MAX_PAGE) +			pageno = 0;							/* wrap around */ +		offset = 0; +		pageJump = true; +	} -		if (!AsyncExistsPendingNotify(relname)) -			continue; +	SET_QUEUE_POS(*position, pageno, offset); +	return pageJump; +} -		if (listenerPID == MyProcPid) -		{ -			/* -			 * Self-notify: no need to bother with table update. Indeed, we -			 * *must not* clear the notification field in this path, or we -			 * could lose an outside notify, which'd be bad for applications -			 * that ignore self-notify messages. -			 */ -			if (Trace_notify) -				elog(DEBUG1, "AtCommit_Notify: notifying self"); +/* + * Fill the AsyncQueueEntry at *qe with an outbound notification message. + */ +static void +asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) +{ +	size_t	channellen = strlen(n->channel); +	size_t	payloadlen = strlen(n->payload); +	int		entryLength; + +	Assert(channellen < NAMEDATALEN); +	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); + +	/* The terminators are already included in AsyncQueueEntryEmptySize */ +	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; +	entryLength = QUEUEALIGN(entryLength); +	qe->length = entryLength; +	qe->dboid = MyDatabaseId; +	qe->xid = GetCurrentTransactionId(); +	qe->srcPid = MyProcPid; +	memcpy(qe->data, n->channel, channellen + 1); +	memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1); +} + +/* + * Add pending notifications to the queue. + * + * We go page by page here, i.e. we stop once we have to go to a new page but + * we will be called again and then fill that next page. If an entry does not + * fit into the current page, we write a dummy entry with an InvalidOid as the + * database OID in order to fill the page. So every page is always used up to + * the last byte which simplifies reading the page later. + * + * We are passed the list cell containing the next notification to write + * and return the first still-unwritten cell back.  Eventually we will return + * NULL indicating all is done. + * + * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock + * locally in this function. + */ +static ListCell * +asyncQueueAddEntries(ListCell *nextNotify) +{ +	AsyncQueueEntry	qe; +	int				pageno; +	int				offset; +	int				slotno; + +	/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ +	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + +	/* Fetch the current page */ +	pageno = QUEUE_POS_PAGE(QUEUE_HEAD); +	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); +	/* Note we mark the page dirty before writing in it */ +	AsyncCtl->shared->page_dirty[slotno] = true; + +	while (nextNotify != NULL) +	{ +		Notification   *n = (Notification *) lfirst(nextNotify); -			NotifyMyFrontEnd(relname, listenerPID); +		/* Construct a valid queue entry in local variable qe */ +		asyncQueueNotificationToEntry(n, &qe); + +		offset = QUEUE_POS_OFFSET(QUEUE_HEAD); + +		/* Check whether the entry really fits on the current page */ +		if (offset + qe.length <= QUEUE_PAGESIZE) +		{ +			/* OK, so advance nextNotify past this item */ +			nextNotify = lnext(nextNotify);  		}  		else  		{ -			if (Trace_notify) -				elog(DEBUG1, "AtCommit_Notify: notifying pid %d", -					 listenerPID); +			/* +			 * Write a dummy entry to fill up the page. Actually readers will +			 * only check dboid and since it won't match any reader's database +			 * OID, they will ignore this entry and move on. +			 */ +			qe.length = QUEUE_PAGESIZE - offset; +			qe.dboid = InvalidOid; +			qe.data[0] = '\0'; /* empty channel */ +			qe.data[1] = '\0'; /* empty payload */ +		} +		/* Now copy qe into the shared buffer page */ +		memcpy(AsyncCtl->shared->page_buffer[slotno] + offset, +			   &qe, +			   qe.length); + +		/* Advance QUEUE_HEAD appropriately, and note if page is full */ +		if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) +		{  			/* -			 * If someone has already notified this listener, we don't bother -			 * modifying the table, but we do still send a NOTIFY_INTERRUPT -			 * signal, just in case that backend missed the earlier signal for -			 * some reason.  It's OK to send the signal first, because the -			 * other guy can't read pg_listener until we unlock it. -			 * -			 * Note: we don't have the other guy's BackendId available, so -			 * this will incur a search of the ProcSignal table.  That's -			 * probably not worth worrying about. +			 * Page is full, so we're done here, but first fill the next +			 * page with zeroes.  The reason to do this is to ensure that +			 * slru.c's idea of the head page is always the same as ours, +			 * which avoids boundary problems in SimpleLruTruncate.  The +			 * test in asyncQueueIsFull() ensured that there is room to +			 * create this page without overrunning the queue.  			 */ -			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT, -							   InvalidBackendId) < 0) +			slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); +			/* And exit the loop */ +			break; +		} +	} + +	LWLockRelease(AsyncCtlLock); + +	return nextNotify; +} + +/* + * Check whether the queue is at least half full, and emit a warning if so. + * + * This is unlikely given the size of the queue, but possible. + * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. + * + * Caller must hold exclusive AsyncQueueLock. + */ +static void +asyncQueueFillWarning(void) +{ +	int				headPage = QUEUE_POS_PAGE(QUEUE_HEAD); +	int				tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); +	int				occupied; +	double			fillDegree; +	TimestampTz		t; + +	occupied = headPage - tailPage; + +	if (occupied == 0) +		return;					/* fast exit for common case */ +	 +	if (occupied < 0) +	{ +		/* head has wrapped around, tail not yet */ +		occupied += QUEUE_MAX_PAGE+1; +	} + +	fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2); + +	if (fillDegree < 0.5) +		return; + +	t = GetCurrentTimestamp(); + +	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, +								   t, QUEUE_FULL_WARN_INTERVAL)) +	{ +		QueuePosition	min = QUEUE_HEAD; +		int32			minPid = InvalidPid; +		int				i; + +		for (i = 1; i <= MaxBackends; i++) +		{ +			if (QUEUE_BACKEND_PID(i) != InvalidPid)  			{ -				/* -				 * Get rid of pg_listener entry if it refers to a PID that no -				 * longer exists.  Presumably, that backend crashed without -				 * deleting its pg_listener entries. This code used to only -				 * delete the entry if errno==ESRCH, but as far as I can see -				 * we should just do it for any failure (certainly at least -				 * for EPERM too...) -				 */ -				simple_heap_delete(lRel, &lTuple->t_self); +				min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); +				if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) +					minPid = QUEUE_BACKEND_PID(i);  			} -			else if (listener->notification == 0) -			{ -				/* Rewrite the tuple with my PID in notification column */ -				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); -				simple_heap_update(lRel, &lTuple->t_self, rTuple); +		} + +		ereport(WARNING, +				(errmsg("pg_notify queue is %.0f%% full", fillDegree * 100), +				 (minPid != InvalidPid ? +				  errdetail("PID %d is among the slowest backends.", minPid) +				  : 0), +				 (minPid != InvalidPid ? +				  errhint("Cleanup can only proceed if this backend ends its current transaction.") +				  : 0))); + +		asyncQueueControl->lastQueueFillWarn = t; +	} +} + +/* + * Send signals to all listening backends (except our own). + * + * Returns true if we sent at least one signal. + * + * Since we need EXCLUSIVE lock anyway we also check the position of the other + * backends and in case one is already up-to-date we don't signal it. + * This can happen if concurrent notifying transactions have sent a signal and + * the signaled backend has read the other notifications and ours in the same + * step. + * + * Since we know the BackendId and the Pid the signalling is quite cheap. + */ +static bool +SignalBackends(void) +{ +	bool		signalled = false; +	int32	   *pids; +	BackendId  *ids; +	int			count; +	int			i; +	int32		pid; -#ifdef NOT_USED					/* currently there are no indexes */ -				CatalogUpdateIndexes(lRel, rTuple); -#endif +	/* +	 * Identify all backends that are listening and not already up-to-date. +	 * We don't want to send signals while holding the AsyncQueueLock, so +	 * we just build a list of target PIDs. +	 * +	 * XXX in principle these pallocs could fail, which would be bad. +	 * Maybe preallocate the arrays?  But in practice this is only run +	 * in trivial transactions, so there should surely be space available. +	 */ +	pids = (int32 *) palloc(MaxBackends * sizeof(int32)); +	ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); +	count = 0; + +	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); +	for (i = 1; i <= MaxBackends; i++) +	{ +		pid = QUEUE_BACKEND_PID(i); +		if (pid != InvalidPid && pid != MyProcPid) +		{ +			QueuePosition pos = QUEUE_BACKEND_POS(i); + +			if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) +			{ +				pids[count] = pid; +				ids[count] = i; +				count++;  			}  		}  	} +	LWLockRelease(AsyncQueueLock); -	heap_endscan(scan); +	/* Now send signals */ +	for (i = 0; i < count; i++) +	{ +		pid = pids[i]; + +		/* +		 * Note: assuming things aren't broken, a signal failure here could +		 * only occur if the target backend exited since we released +		 * AsyncQueueLock; which is unlikely but certainly possible. +		 * So we just log a low-level debug message if it happens. +		 */ +		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) +			elog(DEBUG3, "could not signal backend with PID %d: %m", pid); +		else +			signalled = true; +	} + +	pfree(pids); +	pfree(ids); + +	return signalled;  }  /*   * AtAbort_Notify   * - *		This is called at transaction abort. + *	This is called at transaction abort.   * - *		Gets rid of pending actions and outbound notifies that we would have - *		executed if the transaction got committed. + *	Gets rid of pending actions and outbound notifies that we would have + *	executed if the transaction got committed.   */  void  AtAbort_Notify(void)  { +	/* +	 * If we LISTEN but then roll back the transaction we have set our pointer +	 * but have not made any entry in listenChannels. In that case, remove +	 * our pointer again. +	 */ +	if (backendHasExecutedInitialListen) +	{ +		/* +		 * Checking listenChannels should be redundant but it can't hurt doing +		 * it for safety reasons. +		 */ +		if (listenChannels == NIL) +			asyncQueueUnregister(); + +		backendHasExecutedInitialListen = false; +	} + +	/* And clean up */  	ClearPendingActionsAndNotifies();  } @@ -940,30 +1786,298 @@ DisableNotifyInterrupt(void)  }  /* + * Read all pending notifications from the queue, and deliver appropriate + * ones to my frontend.  Stop when we reach queue head or an uncommitted + * notification. + */ +static void +asyncQueueReadAllNotifications(void) +{ +	QueuePosition	pos; +	QueuePosition	oldpos; +	QueuePosition	head; +	bool		advanceTail; +	/* page_buffer must be adequately aligned, so use a union */ +	union { +		char		buf[QUEUE_PAGESIZE]; +		AsyncQueueEntry align; +	} page_buffer; + +	/* Fetch current state */ +	LWLockAcquire(AsyncQueueLock, LW_SHARED); +	/* Assert checks that we have a valid state entry */ +	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); +	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); +	head = QUEUE_HEAD; +	LWLockRelease(AsyncQueueLock); + +	if (QUEUE_POS_EQUAL(pos, head)) +	{ +		/* Nothing to do, we have read all notifications already. */ +		return; +	} + +	/*---------- +	 * Note that we deliver everything that we see in the queue and that +	 * matches our _current_ listening state. +	 * Especially we do not take into account different commit times. +	 * Consider the following example: +	 * +	 * Backend 1:                    Backend 2: +	 * +	 * transaction starts +	 * NOTIFY foo; +	 * commit starts +	 *                               transaction starts +	 *                               LISTEN foo; +	 *                               commit starts +	 * commit to clog +	 *                               commit to clog +	 * +	 * It could happen that backend 2 sees the notification from backend 1 in +	 * the queue.  Even though the notifying transaction committed before +	 * the listening transaction, we still deliver the notification. +	 * +	 * The idea is that an additional notification does not do any harm, we +	 * just need to make sure that we do not miss a notification. +	 * +	 * It is possible that we fail while trying to send a message to our +	 * frontend (for example, because of encoding conversion failure). +	 * If that happens it is critical that we not try to send the same +	 * message over and over again.  Therefore, we place a PG_TRY block +	 * here that will forcibly advance our backend position before we lose +	 * control to an error.  (We could alternatively retake AsyncQueueLock +	 * and move the position before handling each individual message, but +	 * that seems like too much lock traffic.) +	 *---------- +	 */ +	PG_TRY(); +	{ +		bool		reachedStop; + +		do  +		{ +			int			curpage = QUEUE_POS_PAGE(pos); +			int			curoffset = QUEUE_POS_OFFSET(pos); +			int			slotno; +			int			copysize; + +			/* +			 * We copy the data from SLRU into a local buffer, so as to avoid +			 * holding the AsyncCtlLock while we are examining the entries and +			 * possibly transmitting them to our frontend.  Copy only the part +			 * of the page we will actually inspect. +			 */ +			slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage, +												InvalidTransactionId); +			if (curpage == QUEUE_POS_PAGE(head)) +			{ +				/* we only want to read as far as head */ +				copysize = QUEUE_POS_OFFSET(head) - curoffset; +				if (copysize < 0) +					copysize = 0;			/* just for safety */ +			} +			else +			{ +				/* fetch all the rest of the page */ +				copysize = QUEUE_PAGESIZE - curoffset; +			} +			memcpy(page_buffer.buf + curoffset, +				   AsyncCtl->shared->page_buffer[slotno] + curoffset, +				   copysize); +			/* Release lock that we got from SimpleLruReadPage_ReadOnly() */ +			LWLockRelease(AsyncCtlLock); + +			/* +			 * Process messages up to the stop position, end of page, or an +			 * uncommitted message. +			 * +			 * Our stop position is what we found to be the head's position +			 * when we entered this function. It might have changed +			 * already. But if it has, we will receive (or have already +			 * received and queued) another signal and come here again. +			 * +			 * We are not holding AsyncQueueLock here! The queue can only +			 * extend beyond the head pointer (see above) and we leave our +			 * backend's pointer where it is so nobody will truncate or +			 * rewrite pages under us. Especially we don't want to hold a lock +			 * while sending the notifications to the frontend. +			 */ +			reachedStop = asyncQueueProcessPageEntries(&pos, head, +													   page_buffer.buf); +		} while (!reachedStop); +	} +	PG_CATCH(); +	{ +		/* Update shared state */ +		LWLockAcquire(AsyncQueueLock, LW_SHARED); +		QUEUE_BACKEND_POS(MyBackendId) = pos; +		advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); +		LWLockRelease(AsyncQueueLock); + +		/* If we were the laziest backend, try to advance the tail pointer */ +		if (advanceTail) +			asyncQueueAdvanceTail(); + +		PG_RE_THROW(); +	} +	PG_END_TRY(); + +	/* Update shared state */ +	LWLockAcquire(AsyncQueueLock, LW_SHARED); +	QUEUE_BACKEND_POS(MyBackendId) = pos; +	advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); +	LWLockRelease(AsyncQueueLock); + +	/* If we were the laziest backend, try to advance the tail pointer */ +	if (advanceTail) +		asyncQueueAdvanceTail(); +} + +/* + * Fetch notifications from the shared queue, beginning at position current, + * and deliver relevant ones to my frontend. + * + * The current page must have been fetched into page_buffer from shared + * memory.  (We could access the page right in shared memory, but that + * would imply holding the AsyncCtlLock throughout this routine.) + * + * We stop if we reach the "stop" position, or reach a notification from an + * uncommitted transaction, or reach the end of the page. + * + * The function returns true once we have reached the stop position or an + * uncommitted notification, and false if we have finished with the page. + * In other words: once it returns true there is no need to look further. + */ +static bool +asyncQueueProcessPageEntries(QueuePosition *current, +							 QueuePosition stop, +							 char *page_buffer) +{ +	bool		reachedStop = false; +	bool		reachedEndOfPage; +	AsyncQueueEntry	*qe; + +	do +	{ +		if (QUEUE_POS_EQUAL(*current, stop)) +			break; + +		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current)); + +		/* +		 * Advance *current over this message, possibly to the next page. +		 * As noted in the comments for asyncQueueReadAllNotifications, we +		 * must do this before possibly failing while processing the message. +		 */ +		reachedEndOfPage = asyncQueueAdvance(current, qe->length); + +		/* Ignore messages destined for other databases */ +		if (qe->dboid == MyDatabaseId) +		{ +			if (TransactionIdDidCommit(qe->xid)) +			{ +				/* qe->data is the null-terminated channel name */ +				char   *channel = qe->data; + +				if (IsListeningOn(channel)) +				{ +					/* payload follows channel name */ +					char   *payload = qe->data + strlen(channel) + 1; + +					NotifyMyFrontEnd(channel, payload, qe->srcPid); +				} +			} +			else if (TransactionIdDidAbort(qe->xid)) +			{ +				/* +				 * If the source transaction aborted, we just ignore its +				 * notifications. +				 */ +			} +			else +			{ +				/* +				 * The transaction has neither committed nor aborted so far, +				 * so we can't process its message yet.  Break out of the loop. +				 */ +				reachedStop = true; +				break; +			} +		} + +		/* Loop back if we're not at end of page */ +	} while (!reachedEndOfPage); + +	if (QUEUE_POS_EQUAL(*current, stop)) +		reachedStop = true; + +	return reachedStop; +} + +/* + * Advance the shared queue tail variable to the minimum of all the + * per-backend tail pointers.  Truncate pg_notify space if possible. + */ +static void +asyncQueueAdvanceTail(void) +{ +	QueuePosition	min; +	int				i; +	int				oldtailpage; +	int				newtailpage; +	int				boundary; + +	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); +	min = QUEUE_HEAD; +	for (i = 1; i <= MaxBackends; i++) +	{ +		if (QUEUE_BACKEND_PID(i) != InvalidPid) +			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); +	} +	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); +	QUEUE_TAIL = min; +	LWLockRelease(AsyncQueueLock); + +	/* +	 * We can truncate something if the global tail advanced across an SLRU +	 * segment boundary. +	 * +	 * XXX it might be better to truncate only once every several segments, +	 * to reduce the number of directory scans. +	 */ +	newtailpage = QUEUE_POS_PAGE(min); +	boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); +	if (asyncQueuePagePrecedesLogically(oldtailpage, boundary)) +	{ +		/* +		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also +		 * release the lock again. +		 */ +		SimpleLruTruncate(AsyncCtl, newtailpage); +	} +} + +/*   * ProcessIncomingNotify   *   *		Deal with arriving NOTIFYs from other backends.   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT   *		signal handler, or the next time control reaches the outer idle loop. - *		Scan pg_listener for arriving notifies, report them to my front end, - *		and clear the notification field in pg_listener until next time. + *		Scan the queue for arriving notifications and report them to my front + *		end.   *   *		NOTE: since we are outside any transaction, we must create our own.   */  static void  ProcessIncomingNotify(void)  { -	Relation	lRel; -	TupleDesc	tdesc; -	ScanKeyData key[1]; -	HeapScanDesc scan; -	HeapTuple	lTuple, -				rTuple; -	Datum		value[Natts_pg_listener]; -	bool		repl[Natts_pg_listener], -				nulls[Natts_pg_listener];  	bool		catchup_enabled; +	/* Do nothing if we aren't actively listening */ +	if (listenChannels == NIL) +		return; +  	/* Must prevent catchup interrupt while I am running */  	catchup_enabled = DisableCatchupInterrupt(); @@ -974,62 +2088,13 @@ ProcessIncomingNotify(void)  	notifyInterruptOccurred = 0; -	StartTransactionCommand(); - -	lRel = heap_open(ListenerRelationId, ExclusiveLock); -	tdesc = RelationGetDescr(lRel); - -	/* Scan only entries with my listenerPID */ -	ScanKeyInit(&key[0], -				Anum_pg_listener_listenerpid, -				BTEqualStrategyNumber, F_INT4EQ, -				Int32GetDatum(MyProcPid)); -	scan = heap_beginscan(lRel, SnapshotNow, 1, key); - -	/* Prepare data for rewriting 0 into notification field */ -	memset(nulls, false, sizeof(nulls)); -	memset(repl, false, sizeof(repl)); -	repl[Anum_pg_listener_notification - 1] = true; -	memset(value, 0, sizeof(value)); -	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0); - -	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) -	{ -		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); -		char	   *relname = NameStr(listener->relname); -		int32		sourcePID = listener->notification; - -		if (sourcePID != 0) -		{ -			/* Notify the frontend */ - -			if (Trace_notify) -				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", -					 relname, (int) sourcePID); - -			NotifyMyFrontEnd(relname, sourcePID); - -			/* -			 * Rewrite the tuple with 0 in notification column. -			 */ -			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); -			simple_heap_update(lRel, &lTuple->t_self, rTuple); - -#ifdef NOT_USED					/* currently there are no indexes */ -			CatalogUpdateIndexes(lRel, rTuple); -#endif -		} -	} -	heap_endscan(scan); -  	/* -	 * We do NOT release the lock on pg_listener here; we need to hold it -	 * until end of transaction (which is about to happen, anyway) to ensure -	 * that other backends see our tuple updates when they look. Otherwise, a -	 * transaction started after this one might mistakenly think it doesn't -	 * need to send this backend a new NOTIFY. +	 * We must run asyncQueueReadAllNotifications inside a transaction, +	 * else bad things happen if it gets an error.  	 */ -	heap_close(lRel, NoLock); +	StartTransactionCommand(); + +	asyncQueueReadAllNotifications();  	CommitTransactionCommand(); @@ -1051,20 +2116,17 @@ ProcessIncomingNotify(void)   * Send NOTIFY message to my front end.   */  static void -NotifyMyFrontEnd(char *relname, int32 listenerPID) +NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)  {  	if (whereToSendOutput == DestRemote)  	{  		StringInfoData buf;  		pq_beginmessage(&buf, 'A'); -		pq_sendint(&buf, listenerPID, sizeof(int32)); -		pq_sendstring(&buf, relname); +		pq_sendint(&buf, srcPid, sizeof(int32)); +		pq_sendstring(&buf, channel);  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) -		{ -			/* XXX Add parameter string here later */ -			pq_sendstring(&buf, ""); -		} +			pq_sendstring(&buf, payload);  		pq_endmessage(&buf);  		/* @@ -1074,20 +2136,51 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID)  		 */  	}  	else -		elog(INFO, "NOTIFY for %s", relname); +		elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);  } -/* Does pendingNotifies include the given relname? */ +/* Does pendingNotifies include the given channel/payload? */  static bool -AsyncExistsPendingNotify(const char *relname) +AsyncExistsPendingNotify(const char *channel, const char *payload)  {  	ListCell   *p; +	Notification *n; + +	if (pendingNotifies == NIL) +		return false; + +	if (payload == NULL) +		payload = ""; + +	/*---------- +	 * We need to append new elements to the end of the list in order to keep +	 * the order. However, on the other hand we'd like to check the list +	 * backwards in order to make duplicate-elimination a tad faster when the +	 * same condition is signaled many times in a row. So as a compromise we +	 * check the tail element first which we can access directly. If this +	 * doesn't match, we check the whole list. +	 * +	 * As we are not checking our parents' lists, we can still get duplicates +	 * in combination with subtransactions, like in: +	 * +	 * begin; +	 * notify foo '1'; +	 * savepoint foo; +	 * notify foo '1'; +	 * commit; +	 *---------- +	 */ +	n = (Notification *) llast(pendingNotifies); +	if (strcmp(n->channel, channel) == 0 && +		strcmp(n->payload, payload) == 0) +		return true;  	foreach(p, pendingNotifies)  	{ -		const char *prelname = (const char *) lfirst(p); +		n = (Notification *) lfirst(p); -		if (strcmp(prelname, relname) == 0) +		if (strcmp(n->channel, channel) == 0 && +			strcmp(n->payload, payload) == 0)  			return true;  	} @@ -1108,21 +2201,3 @@ ClearPendingActionsAndNotifies(void)  	pendingActions = NIL;  	pendingNotifies = NIL;  } - -/* - * 2PC processing routine for COMMIT PREPARED case. - * - * (We don't have to do anything for ROLLBACK PREPARED.) - */ -void -notify_twophase_postcommit(TransactionId xid, uint16 info, -						   void *recdata, uint32 len) -{ -	/* -	 * Set up to issue the NOTIFY at the end of my own current transaction. -	 * (XXX this has some issues if my own transaction later rolls back, or if -	 * there is any significant delay before I commit.	OK for now because we -	 * disallow COMMIT PREPARED inside a transaction block.) -	 */ -	Async_Notify((char *) recdata); -} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 271a7a2129b..371d1f245eb 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -15,7 +15,7 @@   * Portions Copyright (c) 1994, Regents of the University of California   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.461 2010/02/12 17:33:20 tgl Exp $ + *	  $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.462 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -2777,6 +2777,7 @@ _copyNotifyStmt(NotifyStmt *from)  	NotifyStmt *newnode = makeNode(NotifyStmt);  	COPY_STRING_FIELD(conditionname); +	COPY_STRING_FIELD(payload);  	return newnode;  } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 001c2096b4e..7dfc1969f5d 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -22,7 +22,7 @@   * Portions Copyright (c) 1994, Regents of the University of California   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ + *	  $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -1325,6 +1325,7 @@ static bool  _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)  {  	COMPARE_STRING_FIELD(conditionname); +	COMPARE_STRING_FIELD(payload);  	return true;  } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 593fe397d3f..878e4b3b94c 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -8,7 +8,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ + *	  $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $   *   * NOTES   *	  Every node type that can appear in stored rules' parsetrees *must* @@ -1820,6 +1820,7 @@ _outNotifyStmt(StringInfo str, NotifyStmt *node)  	WRITE_NODE_TYPE("NOTIFY");  	WRITE_STRING_FIELD(conditionname); +	WRITE_STRING_FIELD(payload);  }  static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 35ba22203fe..f28191d2d00 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -8,7 +8,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.231 2010/02/12 17:33:20 tgl Exp $ + *	  $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.232 2010/02/16 22:34:43 tgl Exp $   *   * NOTES   *	  Path and Plan nodes do not have any readfuncs support, because we @@ -231,6 +231,7 @@ _readNotifyStmt(void)  	READ_LOCALS(NotifyStmt);  	READ_STRING_FIELD(conditionname); +	READ_STRING_FIELD(payload);  	READ_DONE();  } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index da70ee089c5..235a7001adf 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11,7 +11,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.708 2010/02/12 17:33:20 tgl Exp $ + *	  $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.709 2010/02/16 22:34:49 tgl Exp $   *   * HISTORY   *	  AUTHOR			DATE			MAJOR EVENT @@ -400,7 +400,7 @@ static TypeName *TableFuncTypeName(List *columns);  %type <ival>	Iconst SignedIconst  %type <list>	Iconst_list -%type <str>		Sconst comment_text +%type <str>		Sconst comment_text notify_payload  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst  %type <list>	var_list  %type <str>		ColId ColLabel var_name type_function_name param_name @@ -6123,14 +6123,20 @@ DropRuleStmt:   *   *****************************************************************************/ -NotifyStmt: NOTIFY ColId +NotifyStmt: NOTIFY ColId notify_payload  				{  					NotifyStmt *n = makeNode(NotifyStmt);  					n->conditionname = $2; +					n->payload = $3;  					$$ = (Node *)n;  				}  		; +notify_payload: +			',' Sconst							{ $$ = $2; } +			| /*EMPTY*/							{ $$ = NULL; } +		; +  ListenStmt: LISTEN ColId  				{  					ListenStmt *n = makeNode(ListenStmt); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 01a9fabc8c9..b4923d278a7 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -8,7 +8,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.103 2010/01/15 09:19:03 heikki Exp $ + *	  $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.104 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -20,6 +20,7 @@  #include "access/nbtree.h"  #include "access/subtrans.h"  #include "access/twophase.h" +#include "commands/async.h"  #include "miscadmin.h"  #include "pgstat.h"  #include "postmaster/autovacuum.h" @@ -122,6 +123,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)  		size = add_size(size, WalRcvShmemSize());  		size = add_size(size, BTreeShmemSize());  		size = add_size(size, SyncScanShmemSize()); +		size = add_size(size, AsyncShmemSize());  #ifdef EXEC_BACKEND  		size = add_size(size, ShmemBackendArraySize());  #endif @@ -225,6 +227,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)  	 */  	BTreeShmemInit();  	SyncScanShmemInit(); +	AsyncShmemInit();  #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 4e635d1e8aa..7a6cab968bb 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -15,7 +15,7 @@   * Portions Copyright (c) 1994, Regents of the University of California   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.55 2010/01/02 16:57:52 momjian Exp $ + *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.56 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -24,6 +24,7 @@  #include "access/clog.h"  #include "access/multixact.h"  #include "access/subtrans.h" +#include "commands/async.h"  #include "miscadmin.h"  #include "pg_trace.h"  #include "storage/ipc.h" @@ -174,6 +175,9 @@ NumLWLocks(void)  	/* multixact.c needs two SLRU areas */  	numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; +	/* async.c needs one per Async buffer */ +	numLocks += NUM_ASYNC_BUFFERS; +  	/*  	 * Add any requested by loadable modules; for backwards-compatibility  	 * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 15d0808ad24..2ae15d5ce02 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -8,7 +8,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.589 2010/02/16 20:15:14 momjian Exp $ + *	  $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.590 2010/02/16 22:34:50 tgl Exp $   *   * NOTES   *	  this is the "main" module of the postgres backend and @@ -3779,7 +3779,8 @@ PostgresMain(int argc, char *argv[], const char *username)  		 * collector, and to update the PS stats display.  We avoid doing  		 * those every time through the message loop because it'd slow down  		 * processing of batched messages, and because we don't want to report -		 * uncommitted updates (that confuses autovacuum). +		 * uncommitted updates (that confuses autovacuum).  The notification +		 * processor wants a call too, if we are not in a transaction block.  		 */  		if (send_ready_for_query)  		{ @@ -3795,6 +3796,7 @@ PostgresMain(int argc, char *argv[], const char *username)  			}  			else  			{ +				ProcessCompletedNotifies();  				pgstat_report_stat(false);  				set_ps_display("idle", false); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 4879970632a..07f4d0c57ad 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -10,7 +10,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.332 2010/02/14 18:42:15 rhaas Exp $ + *	  $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.333 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -926,17 +926,17 @@ standard_ProcessUtility(Node *parsetree,  		case T_NotifyStmt:  			{  				NotifyStmt *stmt = (NotifyStmt *) parsetree; -				PreventCommandDuringRecovery(); -				Async_Notify(stmt->conditionname); +				PreventCommandDuringRecovery(); +				Async_Notify(stmt->conditionname, stmt->payload);  			}  			break;  		case T_ListenStmt:  			{  				ListenStmt *stmt = (ListenStmt *) parsetree; -				PreventCommandDuringRecovery(); +				PreventCommandDuringRecovery();  				CheckRestrictedOperation("LISTEN");  				Async_Listen(stmt->conditionname);  			} @@ -945,8 +945,8 @@ standard_ProcessUtility(Node *parsetree,  		case T_UnlistenStmt:  			{  				UnlistenStmt *stmt = (UnlistenStmt *) parsetree; -				PreventCommandDuringRecovery(); +				PreventCommandDuringRecovery();  				CheckRestrictedOperation("UNLISTEN");  				if (stmt->conditionname)  					Async_Unlisten(stmt->conditionname); @@ -1105,8 +1105,8 @@ standard_ProcessUtility(Node *parsetree,  		case T_ReindexStmt:  			{  				ReindexStmt *stmt = (ReindexStmt *) parsetree; -				PreventCommandDuringRecovery(); +				PreventCommandDuringRecovery();  				switch (stmt->kind)  				{  					case OBJECT_INDEX: diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index b2127a3129a..294c7c904bb 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9,7 +9,7 @@   *   *   * IDENTIFICATION - *	  $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.322 2010/02/14 18:42:16 rhaas Exp $ + *	  $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.323 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -3465,6 +3465,11 @@ get_utility_query_def(Query *query, deparse_context *context)  							 0, PRETTYINDENT_STD, 1);  		appendStringInfo(buf, "NOTIFY %s",  						 quote_identifier(stmt->conditionname)); +		if (stmt->payload) +		{ +			appendStringInfoString(buf, ", "); +			simple_quote_literal(buf, stmt->payload); +		}  	}  	else  	{ diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 06df8f6af8d..733d8ef74d2 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -42,7 +42,7 @@   * Portions Copyright (c) 1994, Regents of the University of California   * Portions taken from FreeBSD.   * - * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.184 2010/01/26 16:18:12 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.185 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -2458,6 +2458,7 @@ main(int argc, char *argv[])  		"pg_xlog",  		"pg_xlog/archive_status",  		"pg_clog", +		"pg_notify",  		"pg_subtrans",  		"pg_twophase",  		"pg_multixact/members", diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 8422bb76ab5..9661f0cd803 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -3,7 +3,7 @@   *   * Copyright (c) 2000-2010, PostgreSQL Global Development Group   * - * $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.143 2010/01/02 16:57:59 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.144 2010/02/16 22:34:50 tgl Exp $   */  #include "postgres_fe.h"  #include "common.h" @@ -555,8 +555,13 @@ PrintNotifications(void)  	while ((notify = PQnotifies(pset.db)))  	{ -		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), -				notify->relname, notify->be_pid); +		/* for backward compatibility, only show payload if nonempty */ +		if (notify->extra[0]) +			fprintf(pset.queryFout, _("Asynchronous notification \"%s\" with payload \"%s\" received from server process with PID %d.\n"), +					notify->relname, notify->extra, notify->be_pid); +		else +			fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), +					notify->relname, notify->be_pid);  		fflush(pset.queryFout);  		PQfreemem(notify);  	} diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 19b9b1f3ef5..0b10472c12e 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3,7 +3,7 @@   *   * Copyright (c) 2000-2010, PostgreSQL Global Development Group   * - * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.193 2010/02/15 02:55:01 itagaki Exp $ + * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.194 2010/02/16 22:34:50 tgl Exp $   */  /*---------------------------------------------------------------------- @@ -1864,7 +1864,7 @@ psql_completion(char *text, int start, int end)  /* NOTIFY */  	else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0) -		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s'"); +		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s'");  /* OPTIONS */  	else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0) @@ -2105,7 +2105,7 @@ psql_completion(char *text, int start, int end)  /* UNLISTEN */  	else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0) -		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'"); +		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'");  /* UPDATE */  	/* If prev. word is UPDATE suggest a list of tables */ diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 8e820ae72dd..4cc40ba5f70 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -6,7 +6,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/access/slru.h,v 1.25 2010/01/02 16:58:00 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/slru.h,v 1.26 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -18,6 +18,25 @@  /* + * Define SLRU segment size.  A page is the same BLCKSZ as is used everywhere + * else in Postgres.  The segment size can be chosen somewhat arbitrarily; + * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG + * or 64K transactions for SUBTRANS. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where + * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at + * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need + * take no explicit notice of that fact in slru.c, except when comparing + * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). + * + * Note: slru.c currently assumes that segment file names will be four hex + * digits.  This sets a lower bound on the segment size (64K transactions + * for 32-bit TransactionIds). + */ +#define SLRU_PAGES_PER_SEGMENT	32 + +/*   * Page status codes.  Note that these do not include the "dirty" bit.   * page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states;   * in the latter case it implies that the page has been re-dirtied since @@ -55,8 +74,8 @@ typedef struct SlruSharedData  	/*  	 * Optional array of WAL flush LSNs associated with entries in the SLRU  	 * pages.  If not zero/NULL, we must flush WAL before writing pages (true -	 * for pg_clog, false for multixact and pg_subtrans).  group_lsn[] has -	 * lsn_groups_per_page entries per buffer slot, each containing the +	 * for pg_clog, false for multixact, pg_subtrans, pg_notify).  group_lsn[] +	 * has lsn_groups_per_page entries per buffer slot, each containing the  	 * highest LSN known for a contiguous group of SLRU entries on that slot's  	 * page.  	 */ @@ -94,7 +113,7 @@ typedef struct SlruCtlData  	/*  	 * This flag tells whether to fsync writes (true for pg_clog and multixact -	 * stuff, false for pg_subtrans). +	 * stuff, false for pg_subtrans and pg_notify).  	 */  	bool		do_fsync; diff --git a/src/include/access/twophase_rmgr.h b/src/include/access/twophase_rmgr.h index a42d7745205..1d4d1cb2217 100644 --- a/src/include/access/twophase_rmgr.h +++ b/src/include/access/twophase_rmgr.h @@ -7,7 +7,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.11 2010/01/02 16:58:00 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.12 2010/02/16 22:34:50 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -23,9 +23,8 @@ typedef uint8 TwoPhaseRmgrId;   */  #define TWOPHASE_RM_END_ID			0  #define TWOPHASE_RM_LOCK_ID			1 -#define TWOPHASE_RM_NOTIFY_ID		2 -#define TWOPHASE_RM_PGSTAT_ID		3 -#define TWOPHASE_RM_MULTIXACT_ID	4 +#define TWOPHASE_RM_PGSTAT_ID		2 +#define TWOPHASE_RM_MULTIXACT_ID	3  #define TWOPHASE_RM_MAX_ID			TWOPHASE_RM_MULTIXACT_ID  extern const TwoPhaseCallback twophase_recover_callbacks[]; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 9a1d19380c7..2d63232b9b5 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.584 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.585 2010/02/16 22:34:54 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -53,6 +53,6 @@   */  /*							yyyymmddN */ -#define CATALOG_VERSION_NO	201002121 +#define CATALOG_VERSION_NO	201002161  #endif diff --git a/src/include/catalog/pg_listener.h b/src/include/catalog/pg_listener.h deleted file mode 100644 index 4d1a77d6602..00000000000 --- a/src/include/catalog/pg_listener.h +++ /dev/null @@ -1,59 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pg_listener.h - *	  Asynchronous notification - * - * - * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * $PostgreSQL: pgsql/src/include/catalog/pg_listener.h,v 1.28 2010/01/05 01:06:56 tgl Exp $ - * - * NOTES - *	  the genbki.pl script reads this file and generates .bki - *	  information from the DATA() statements. - * - *------------------------------------------------------------------------- - */ -#ifndef PG_LISTENER_H -#define PG_LISTENER_H - -#include "catalog/genbki.h" - -/* ---------------------------------------------------------------- - *		pg_listener definition. - * - *		cpp turns this into typedef struct FormData_pg_listener - * ---------------------------------------------------------------- - */ -#define ListenerRelationId	2614 - -CATALOG(pg_listener,2614) BKI_WITHOUT_OIDS -{ -	NameData	relname; -	int4		listenerpid; -	int4		notification; -} FormData_pg_listener; - -/* ---------------- - *		Form_pg_listener corresponds to a pointer to a tuple with - *		the format of pg_listener relation. - * ---------------- - */ -typedef FormData_pg_listener *Form_pg_listener; - -/* ---------------- - *		compiler constants for pg_listener - * ---------------- - */ -#define Natts_pg_listener						3 -#define Anum_pg_listener_relname				1 -#define Anum_pg_listener_listenerpid			2 -#define Anum_pg_listener_notification			3 - -/* ---------------- - *		initial contents of pg_listener are NOTHING. - * ---------------- - */ - -#endif   /* PG_LISTENER_H */ diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 727b13e264c..1c87a94a0de 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -7,7 +7,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.568 2010/02/07 20:48:11 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.569 2010/02/16 22:34:56 tgl Exp $   *   * NOTES   *	  The script catalog/genbki.pl reads this file and generates .bki @@ -4131,8 +4131,12 @@ DATA(insert OID = 2599 (  pg_timezone_abbrevs	PGNSP PGUID 12 1 1000 0 f f f t t  DESCR("get the available time zone abbreviations");  DATA(insert OID = 2856 (  pg_timezone_names		PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ ));  DESCR("get the available time zone names"); -DATA(insert OID = 2730 (  pg_get_triggerdef    PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); +DATA(insert OID = 2730 (  pg_get_triggerdef		PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));  DESCR("trigger description with pretty-print option"); +DATA(insert OID = 3035 (  pg_listening_channels	PGNSP PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ )); +DESCR("get the channels that the current backend listens to"); +DATA(insert OID = 3036 (  pg_notify				PGNSP PGUID 12 1 0 0 f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ )); +DESCR("send a notification event");  /* non-persistent series generator */  DATA(insert OID = 1066 (  generate_series PGNSP PGUID 12 1 1000 0 f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ )); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 5c9e8ab8906..a9e4d42853d 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -6,28 +6,44 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/commands/async.h,v 1.39 2010/01/02 16:58:03 momjian Exp $ + * $PostgreSQL: pgsql/src/include/commands/async.h,v 1.40 2010/02/16 22:34:57 tgl Exp $   *   *-------------------------------------------------------------------------   */  #ifndef ASYNC_H  #define ASYNC_H +#include "fmgr.h" + +/* + * The number of SLRU page buffers we use for the notification queue. + */ +#define NUM_ASYNC_BUFFERS	8 +  extern bool Trace_notify; +extern Size AsyncShmemSize(void); +extern void AsyncShmemInit(void); +  /* notify-related SQL statements */ -extern void Async_Notify(const char *relname); -extern void Async_Listen(const char *relname); -extern void Async_Unlisten(const char *relname); +extern void Async_Notify(const char *channel, const char *payload); +extern void Async_Listen(const char *channel); +extern void Async_Unlisten(const char *channel);  extern void Async_UnlistenAll(void); +/* notify-related SQL functions */ +extern Datum pg_listening_channels(PG_FUNCTION_ARGS); +extern Datum pg_notify(PG_FUNCTION_ARGS); +  /* perform (or cancel) outbound notify processing at transaction commit */ +extern void PreCommit_Notify(void);  extern void AtCommit_Notify(void);  extern void AtAbort_Notify(void);  extern void AtSubStart_Notify(void);  extern void AtSubCommit_Notify(void);  extern void AtSubAbort_Notify(void);  extern void AtPrepare_Notify(void); +extern void ProcessCompletedNotifies(void);  /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */  extern void HandleNotifyInterrupt(void); @@ -40,7 +56,4 @@ extern void HandleNotifyInterrupt(void);  extern void EnableNotifyInterrupt(void);  extern bool DisableNotifyInterrupt(void); -extern void notify_twophase_postcommit(TransactionId xid, uint16 info, -						   void *recdata, uint32 len); -  #endif   /* ASYNC_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 0c3aecfa6e6..ca229c8e23d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -13,7 +13,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.429 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.430 2010/02/16 22:34:57 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -2097,6 +2097,7 @@ typedef struct NotifyStmt  {  	NodeTag		type;  	char	   *conditionname;	/* condition name to notify */ +	char	   *payload;		/* the payload string, or NULL if none */  } NotifyStmt;  /* ---------------------- diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index f0beb20a24b..2ace9585009 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -7,7 +7,7 @@   * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group   * Portions Copyright (c) 1994, Regents of the University of California   * - * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.44 2010/02/07 20:48:13 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.45 2010/02/16 22:34:57 tgl Exp $   *   *-------------------------------------------------------------------------   */ @@ -68,6 +68,8 @@ typedef enum LWLockId  	AutovacuumScheduleLock,  	SyncScanLock,  	RelationMappingLock, + 	AsyncCtlLock, + 	AsyncQueueLock,  	/* Individual lock IDs end here */  	FirstBufMappingLock,  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out index 83cacbdd209..59c120c99a3 100644 --- a/src/test/regress/expected/guc.out +++ b/src/test/regress/expected/guc.out @@ -532,9 +532,9 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS;  CREATE ROLE temp_reset_user;  SET SESSION AUTHORIZATION temp_reset_user;  -- look changes -SELECT relname FROM pg_listener; -  relname   ------------ +SELECT pg_listening_channels(); + pg_listening_channels  +-----------------------   foo_event  (1 row) @@ -571,9 +571,9 @@ SELECT current_user = 'temp_reset_user';  -- discard everything  DISCARD ALL;  -- look again -SELECT relname FROM pg_listener; - relname  ---------- +SELECT pg_listening_channels(); + pg_listening_channels  +-----------------------  (0 rows)  SELECT name FROM pg_prepared_statements; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 4dc59d9b5de..1d9e1100448 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -107,7 +107,6 @@ SELECT relname, relhasindex   pg_language             | t   pg_largeobject          | t   pg_largeobject_metadata | t - pg_listener             | f   pg_namespace            | t   pg_opclass              | t   pg_operator             | t @@ -154,7 +153,7 @@ SELECT relname, relhasindex   timetz_tbl              | f   tinterval_tbl           | f   varchar_tbl             | f -(143 rows) +(142 rows)  --  -- another sanity check: every system catalog that has OIDs should have diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql index a7b795af47d..21ed86f26ba 100644 --- a/src/test/regress/sql/guc.sql +++ b/src/test/regress/sql/guc.sql @@ -165,7 +165,7 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS;  CREATE ROLE temp_reset_user;  SET SESSION AUTHORIZATION temp_reset_user;  -- look changes -SELECT relname FROM pg_listener; +SELECT pg_listening_channels();  SELECT name FROM pg_prepared_statements;  SELECT name FROM pg_cursors;  SHOW vacuum_cost_delay; @@ -174,7 +174,7 @@ SELECT current_user = 'temp_reset_user';  -- discard everything  DISCARD ALL;  -- look again -SELECT relname FROM pg_listener; +SELECT pg_listening_channels();  SELECT name FROM pg_prepared_statements;  SELECT name FROM pg_cursors;  SHOW vacuum_cost_delay; | 
