| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
 | /*-------------------------------------------------------------------------
 *
 * shm_mq.c
 *	  single-reader, single-writer shared memory message queue
 *
 * Both the sender and the receiver must have a PGPROC; their respective
 * process latches are used for synchronization.  Only the sender may send,
 * and only the receiver may receive.  This is intended to allow a user
 * backend to communicate with worker backends that it has registered.
 *
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/include/storage/shm_mq.h
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
/*
 * This structure represents the actual queue, stored in shared memory.
 *
 * Some notes on synchronization:
 *
 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
 * mq_sender and mq_bytes_written can only be changed by the sender.
 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
 * they cannot change once set, and thus may be read without a lock once this
 * is known to be the case.
 *
 * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
 * they are written atomically using 8 byte loads and stores.  Memory barriers
 * must be carefully used to synchronize reads and writes of these values with
 * reads and writes of the actual data in mq_ring.
 *
 * mq_detached needs no locking.  It can be set by either the sender or the
 * receiver, but only ever from false to true, so redundant writes don't
 * matter.  It is important that if we set mq_detached and then set the
 * counterparty's latch, the counterparty must be certain to see the change
 * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
 * ends with one, this should be OK.
 *
 * mq_ring_size and mq_ring_offset never change after initialization, and
 * can therefore be read without the lock.
 *
 * Importantly, mq_ring can be safely read and written without a lock.
 * At any given time, the difference between mq_bytes_read and
 * mq_bytes_written defines the number of bytes within mq_ring that contain
 * unread data, and mq_bytes_read defines the position where those bytes
 * begin.  The sender can increase the number of unread bytes at any time,
 * but only the receiver can give license to overwrite those bytes, by
 * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
 * the unread bytes it knows to be present without the lock.  Conversely,
 * the sender can write to the unused portion of the ring buffer without
 * the lock, because nobody else can be reading or writing those bytes.  The
 * receiver could be making more bytes unused by incrementing mq_bytes_read,
 * but that's OK.  Note that it would be unsafe for the receiver to read any
 * data it's already marked as read, or to write any data; and it would be
 * unsafe for the sender to reread any data after incrementing
 * mq_bytes_written, but fortunately there's no need for any of that.
 */
struct shm_mq
{
	slock_t		mq_mutex;
	PGPROC	   *mq_receiver;
	PGPROC	   *mq_sender;
	pg_atomic_uint64 mq_bytes_read;
	pg_atomic_uint64 mq_bytes_written;
	Size		mq_ring_size;
	bool		mq_detached;
	uint8		mq_ring_offset;
	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
};
/*
 * This structure is a backend-private handle for access to a queue.
 *
 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
 * an optional pointer to the dynamic shared memory segment that contains it.
 * (If mqh_segment is provided, we register an on_dsm_detach callback to
 * make sure we detach from the queue before detaching from DSM.)
 *
 * If this queue is intended to connect the current process with a background
 * worker that started it, the user can pass a pointer to the worker handle
 * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
 * is to allow us to begin sending to or receiving from that queue before the
 * process we'll be communicating with has even been started.  If it fails
 * to start, the handle will allow us to notice that and fail cleanly, rather
 * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
 * simple cases - e.g. where there are just 2 processes communicating; in
 * more complex scenarios, every process may not have a BackgroundWorkerHandle
 * available, or may need to watch for the failure of more than one other
 * process at a time.
 *
 * When a message exists as a contiguous chunk of bytes in the queue - that is,
 * it is smaller than the size of the ring buffer and does not wrap around
 * the end - we return the message to the caller as a pointer into the buffer.
 * For messages that are larger or happen to wrap, we reassemble the message
 * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
 * the buffer, and mqh_buflen is the number of bytes allocated for it.
 *
 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
 * are used to track the state of non-blocking operations.  When the caller
 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
 * are expected to retry the call at a later time with the same argument;
 * we need to retain enough state to pick up where we left off.
 * mqh_length_word_complete tracks whether we are done sending or receiving
 * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
 * the number of bytes read or written for either the length word or the
 * message itself, and mqh_expected_bytes - which is used only for reads -
 * tracks the expected total size of the payload.
 *
 * mqh_counterparty_attached tracks whether we know the counterparty to have
 * attached to the queue at some previous point.  This lets us avoid some
 * mutex acquisitions.
 *
 * mqh_context is the memory context in effect at the time we attached to
 * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
 * we make sure any other allocations we do happen in this context as well,
 * to avoid nasty surprises.
 */
struct shm_mq_handle
{
	shm_mq	   *mqh_queue;
	dsm_segment *mqh_segment;
	BackgroundWorkerHandle *mqh_handle;
	char	   *mqh_buffer;
	Size		mqh_buflen;
	Size		mqh_consume_pending;
	Size		mqh_partial_bytes;
	Size		mqh_expected_bytes;
	bool		mqh_length_word_complete;
	bool		mqh_counterparty_attached;
	MemoryContext mqh_context;
};
static void shm_mq_detach_internal(shm_mq *mq);
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
				  const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
					 Size bytes_needed, bool nowait, Size *nbytesp,
					 void **datap);
static bool shm_mq_counterparty_gone(shm_mq *mq,
						 BackgroundWorkerHandle *handle);
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
					 BackgroundWorkerHandle *handle);
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
/* Minimum queue size is enough for header and at least one chunk of data. */
const Size	shm_mq_minimum_size =
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
#define MQH_INITIAL_BUFSIZE				8192
/*
 * Initialize a new shared message queue.
 */
shm_mq *
shm_mq_create(void *address, Size size)
{
	shm_mq	   *mq = address;
	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
	size = MAXALIGN_DOWN(size);
	/* Queue size must be large enough to hold some data. */
	Assert(size > data_offset);
	/* Initialize queue header. */
	SpinLockInit(&mq->mq_mutex);
	mq->mq_receiver = NULL;
	mq->mq_sender = NULL;
	pg_atomic_init_u64(&mq->mq_bytes_read, 0);
	pg_atomic_init_u64(&mq->mq_bytes_written, 0);
	mq->mq_ring_size = size - data_offset;
	mq->mq_detached = false;
	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
	return mq;
}
/*
 * Set the identity of the process that will receive from a shared message
 * queue.
 */
void
shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
{
	PGPROC	   *sender;
	SpinLockAcquire(&mq->mq_mutex);
	Assert(mq->mq_receiver == NULL);
	mq->mq_receiver = proc;
	sender = mq->mq_sender;
	SpinLockRelease(&mq->mq_mutex);
	if (sender != NULL)
		SetLatch(&sender->procLatch);
}
/*
 * Set the identity of the process that will send to a shared message queue.
 */
void
shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
{
	PGPROC	   *receiver;
	SpinLockAcquire(&mq->mq_mutex);
	Assert(mq->mq_sender == NULL);
	mq->mq_sender = proc;
	receiver = mq->mq_receiver;
	SpinLockRelease(&mq->mq_mutex);
	if (receiver != NULL)
		SetLatch(&receiver->procLatch);
}
/*
 * Get the configured receiver.
 */
PGPROC *
shm_mq_get_receiver(shm_mq *mq)
{
	PGPROC	   *receiver;
	SpinLockAcquire(&mq->mq_mutex);
	receiver = mq->mq_receiver;
	SpinLockRelease(&mq->mq_mutex);
	return receiver;
}
/*
 * Get the configured sender.
 */
PGPROC *
shm_mq_get_sender(shm_mq *mq)
{
	PGPROC	   *sender;
	SpinLockAcquire(&mq->mq_mutex);
	sender = mq->mq_sender;
	SpinLockRelease(&mq->mq_mutex);
	return sender;
}
/*
 * Attach to a shared message queue so we can send or receive messages.
 *
 * The memory context in effect at the time this function is called should
 * be one which will last for at least as long as the message queue itself.
 * We'll allocate the handle in that context, and future allocations that
 * are needed to buffer incoming data will happen in that context as well.
 *
 * If seg != NULL, the queue will be automatically detached when that dynamic
 * shared memory segment is detached.
 *
 * If handle != NULL, the queue can be read or written even before the
 * other process has attached.  We'll wait for it to do so if needed.  The
 * handle must be for a background worker initialized with bgw_notify_pid
 * equal to our PID.
 *
 * shm_mq_detach() should be called when done.  This will free the
 * shm_mq_handle and mark the queue itself as detached, so that our
 * counterpart won't get stuck waiting for us to fill or drain the queue
 * after we've already lost interest.
 */
shm_mq_handle *
shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
{
	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
	mqh->mqh_queue = mq;
	mqh->mqh_segment = seg;
	mqh->mqh_handle = handle;
	mqh->mqh_buffer = NULL;
	mqh->mqh_buflen = 0;
	mqh->mqh_consume_pending = 0;
	mqh->mqh_partial_bytes = 0;
	mqh->mqh_expected_bytes = 0;
	mqh->mqh_length_word_complete = false;
	mqh->mqh_counterparty_attached = false;
	mqh->mqh_context = CurrentMemoryContext;
	if (seg != NULL)
		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
	return mqh;
}
/*
 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
 * been passed to shm_mq_attach.
 */
void
shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
{
	Assert(mqh->mqh_handle == NULL);
	mqh->mqh_handle = handle;
}
/*
 * Write a message into a shared message queue.
 */
shm_mq_result
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
{
	shm_mq_iovec iov;
	iov.data = data;
	iov.len = nbytes;
	return shm_mq_sendv(mqh, &iov, 1, nowait);
}
/*
 * Write a message into a shared message queue, gathered from multiple
 * addresses.
 *
 * When nowait = false, we'll wait on our process latch when the ring buffer
 * fills up, and then continue writing once the receiver has drained some data.
 * The process latch is reset after each wait.
 *
 * When nowait = true, we do not manipulate the state of the process latch;
 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
 * this case, the caller should call this function again, with the same
 * arguments, each time the process latch is set.  (Once begun, the sending
 * of a message cannot be aborted except by detaching from the queue; changing
 * the length or payload will corrupt the queue.)
 */
shm_mq_result
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{
	shm_mq_result res;
	shm_mq	   *mq = mqh->mqh_queue;
	PGPROC	   *receiver;
	Size		nbytes = 0;
	Size		bytes_written;
	int			i;
	int			which_iov = 0;
	Size		offset;
	Assert(mq->mq_sender == MyProc);
	/* Compute total size of write. */
	for (i = 0; i < iovcnt; ++i)
		nbytes += iov[i].len;
	/* Try to write, or finish writing, the length word into the buffer. */
	while (!mqh->mqh_length_word_complete)
	{
		Assert(mqh->mqh_partial_bytes < sizeof(Size));
		res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
								((char *) &nbytes) + mqh->mqh_partial_bytes,
								nowait, &bytes_written);
		if (res == SHM_MQ_DETACHED)
		{
			/* Reset state in case caller tries to send another message. */
			mqh->mqh_partial_bytes = 0;
			mqh->mqh_length_word_complete = false;
			return res;
		}
		mqh->mqh_partial_bytes += bytes_written;
		if (mqh->mqh_partial_bytes >= sizeof(Size))
		{
			Assert(mqh->mqh_partial_bytes == sizeof(Size));
			mqh->mqh_partial_bytes = 0;
			mqh->mqh_length_word_complete = true;
		}
		if (res != SHM_MQ_SUCCESS)
			return res;
		/* Length word can't be split unless bigger than required alignment. */
		Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
	}
	/* Write the actual data bytes into the buffer. */
	Assert(mqh->mqh_partial_bytes <= nbytes);
	offset = mqh->mqh_partial_bytes;
	do
	{
		Size		chunksize;
		/* Figure out which bytes need to be sent next. */
		if (offset >= iov[which_iov].len)
		{
			offset -= iov[which_iov].len;
			++which_iov;
			if (which_iov >= iovcnt)
				break;
			continue;
		}
		/*
		 * We want to avoid copying the data if at all possible, but every
		 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
		 * the last.  Thus, if a chunk other than the last one ends on a
		 * non-MAXALIGN'd boundary, we have to combine the tail end of its
		 * data with data from one or more following chunks until we either
		 * reach the last chunk or accumulate a number of bytes which is
		 * MAXALIGN'd.
		 */
		if (which_iov + 1 < iovcnt &&
			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
		{
			char		tmpbuf[MAXIMUM_ALIGNOF];
			int			j = 0;
			for (;;)
			{
				if (offset < iov[which_iov].len)
				{
					tmpbuf[j] = iov[which_iov].data[offset];
					j++;
					offset++;
					if (j == MAXIMUM_ALIGNOF)
						break;
				}
				else
				{
					offset -= iov[which_iov].len;
					which_iov++;
					if (which_iov >= iovcnt)
						break;
				}
			}
			res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
			if (res == SHM_MQ_DETACHED)
			{
				/* Reset state in case caller tries to send another message. */
				mqh->mqh_partial_bytes = 0;
				mqh->mqh_length_word_complete = false;
				return res;
			}
			mqh->mqh_partial_bytes += bytes_written;
			if (res != SHM_MQ_SUCCESS)
				return res;
			continue;
		}
		/*
		 * If this is the last chunk, we can write all the data, even if it
		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
		 * MAXALIGN_DOWN the write size.
		 */
		chunksize = iov[which_iov].len - offset;
		if (which_iov + 1 < iovcnt)
			chunksize = MAXALIGN_DOWN(chunksize);
		res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
								nowait, &bytes_written);
		if (res == SHM_MQ_DETACHED)
		{
			/* Reset state in case caller tries to send another message. */
			mqh->mqh_length_word_complete = false;
			mqh->mqh_partial_bytes = 0;
			return res;
		}
		mqh->mqh_partial_bytes += bytes_written;
		offset += bytes_written;
		if (res != SHM_MQ_SUCCESS)
			return res;
	} while (mqh->mqh_partial_bytes < nbytes);
	/* Reset for next message. */
	mqh->mqh_partial_bytes = 0;
	mqh->mqh_length_word_complete = false;
	/* If queue has been detached, let caller know. */
	if (mq->mq_detached)
		return SHM_MQ_DETACHED;
	/*
	 * If the counterpary is known to have attached, we can read mq_receiver
	 * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
	 * more caution is needed.
	 */
	if (mqh->mqh_counterparty_attached)
		receiver = mq->mq_receiver;
	else
	{
		SpinLockAcquire(&mq->mq_mutex);
		receiver = mq->mq_receiver;
		SpinLockRelease(&mq->mq_mutex);
		if (receiver == NULL)
			return SHM_MQ_SUCCESS;
		mqh->mqh_counterparty_attached = true;
	}
	/* Notify receiver of the newly-written data, and return. */
	SetLatch(&receiver->procLatch);
	return SHM_MQ_SUCCESS;
}
/*
 * Receive a message from a shared message queue.
 *
 * We set *nbytes to the message length and *data to point to the message
 * payload.  If the entire message exists in the queue as a single,
 * contiguous chunk, *data will point directly into shared memory; otherwise,
 * it will point to a temporary buffer.  This mostly avoids data copying in
 * the hoped-for case where messages are short compared to the buffer size,
 * while still allowing longer messages.  In either case, the return value
 * remains valid until the next receive operation is performed on the queue.
 *
 * When nowait = false, we'll wait on our process latch when the ring buffer
 * is empty and we have not yet received a full message.  The sender will
 * set our process latch after more data has been written, and we'll resume
 * processing.  Each call will therefore return a complete message
 * (unless the sender detaches the queue).
 *
 * When nowait = true, we do not manipulate the state of the process latch;
 * instead, whenever the buffer is empty and we need to read from it, we
 * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
 * function again after the process latch has been set.
 */
shm_mq_result
shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
{
	shm_mq	   *mq = mqh->mqh_queue;
	shm_mq_result res;
	Size		rb = 0;
	Size		nbytes;
	void	   *rawdata;
	Assert(mq->mq_receiver == MyProc);
	/* We can't receive data until the sender has attached. */
	if (!mqh->mqh_counterparty_attached)
	{
		if (nowait)
		{
			int			counterparty_gone;
			/*
			 * We shouldn't return at this point at all unless the sender
			 * hasn't attached yet.  However, the correct return value depends
			 * on whether the sender is still attached.  If we first test
			 * whether the sender has ever attached and then test whether the
			 * sender has detached, there's a race condition: a sender that
			 * attaches and detaches very quickly might fool us into thinking
			 * the sender never attached at all.  So, test whether our
			 * counterparty is definitively gone first, and only afterwards
			 * check whether the sender ever attached in the first place.
			 */
			counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
			if (shm_mq_get_sender(mq) == NULL)
			{
				if (counterparty_gone)
					return SHM_MQ_DETACHED;
				else
					return SHM_MQ_WOULD_BLOCK;
			}
		}
		else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
				 && shm_mq_get_sender(mq) == NULL)
		{
			mq->mq_detached = true;
			return SHM_MQ_DETACHED;
		}
		mqh->mqh_counterparty_attached = true;
	}
	/*
	 * If we've consumed an amount of data greater than 1/4th of the ring
	 * size, mark it consumed in shared memory.  We try to avoid doing this
	 * unnecessarily when only a small amount of data has been consumed,
	 * because SetLatch() is fairly expensive and we don't want to do it too
	 * often.
	 */
	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
	{
		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
		mqh->mqh_consume_pending = 0;
	}
	/* Try to read, or finish reading, the length word from the buffer. */
	while (!mqh->mqh_length_word_complete)
	{
		/* Try to receive the message length word. */
		Assert(mqh->mqh_partial_bytes < sizeof(Size));
		res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
								   nowait, &rb, &rawdata);
		if (res != SHM_MQ_SUCCESS)
			return res;
		/*
		 * Hopefully, we'll receive the entire message length word at once.
		 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
		 * multiple reads.
		 */
		if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
		{
			Size		needed;
			nbytes = *(Size *) rawdata;
			/* If we've already got the whole message, we're done. */
			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
			if (rb >= needed)
			{
				mqh->mqh_consume_pending += needed;
				*nbytesp = nbytes;
				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
				return SHM_MQ_SUCCESS;
			}
			/*
			 * We don't have the whole message, but we at least have the whole
			 * length word.
			 */
			mqh->mqh_expected_bytes = nbytes;
			mqh->mqh_length_word_complete = true;
			mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
			rb -= MAXALIGN(sizeof(Size));
		}
		else
		{
			Size		lengthbytes;
			/* Can't be split unless bigger than required alignment. */
			Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
			/* Message word is split; need buffer to reassemble. */
			if (mqh->mqh_buffer == NULL)
			{
				mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
													 MQH_INITIAL_BUFSIZE);
				mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
			}
			Assert(mqh->mqh_buflen >= sizeof(Size));
			/* Copy partial length word; remember to consume it. */
			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
			else
				lengthbytes = rb;
			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
				   lengthbytes);
			mqh->mqh_partial_bytes += lengthbytes;
			mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
			rb -= lengthbytes;
			/* If we now have the whole word, we're ready to read payload. */
			if (mqh->mqh_partial_bytes >= sizeof(Size))
			{
				Assert(mqh->mqh_partial_bytes == sizeof(Size));
				mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
				mqh->mqh_length_word_complete = true;
				mqh->mqh_partial_bytes = 0;
			}
		}
	}
	nbytes = mqh->mqh_expected_bytes;
	if (mqh->mqh_partial_bytes == 0)
	{
		/*
		 * Try to obtain the whole message in a single chunk.  If this works,
		 * we need not copy the data and can return a pointer directly into
		 * shared memory.
		 */
		res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
		if (res != SHM_MQ_SUCCESS)
			return res;
		if (rb >= nbytes)
		{
			mqh->mqh_length_word_complete = false;
			mqh->mqh_consume_pending += MAXALIGN(nbytes);
			*nbytesp = nbytes;
			*datap = rawdata;
			return SHM_MQ_SUCCESS;
		}
		/*
		 * The message has wrapped the buffer.  We'll need to copy it in order
		 * to return it to the client in one chunk.  First, make sure we have
		 * a large enough buffer available.
		 */
		if (mqh->mqh_buflen < nbytes)
		{
			Size		newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
			while (newbuflen < nbytes)
				newbuflen *= 2;
			if (mqh->mqh_buffer != NULL)
			{
				pfree(mqh->mqh_buffer);
				mqh->mqh_buffer = NULL;
				mqh->mqh_buflen = 0;
			}
			mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
			mqh->mqh_buflen = newbuflen;
		}
	}
	/* Loop until we've copied the entire message. */
	for (;;)
	{
		Size		still_needed;
		/* Copy as much as we can. */
		Assert(mqh->mqh_partial_bytes + rb <= nbytes);
		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
		mqh->mqh_partial_bytes += rb;
		/*
		 * Update count of bytes that can be consumed, accounting for
		 * alignment padding.  Note that this will never actually insert any
		 * padding except at the end of a message, because the buffer size is
		 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
		 */
		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
		mqh->mqh_consume_pending += MAXALIGN(rb);
		/* If we got all the data, exit the loop. */
		if (mqh->mqh_partial_bytes >= nbytes)
			break;
		/* Wait for some more data. */
		still_needed = nbytes - mqh->mqh_partial_bytes;
		res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
		if (res != SHM_MQ_SUCCESS)
			return res;
		if (rb > still_needed)
			rb = still_needed;
	}
	/* Return the complete message, and reset for next message. */
	*nbytesp = nbytes;
	*datap = mqh->mqh_buffer;
	mqh->mqh_length_word_complete = false;
	mqh->mqh_partial_bytes = 0;
	return SHM_MQ_SUCCESS;
}
/*
 * Wait for the other process that's supposed to use this queue to attach
 * to it.
 *
 * The return value is SHM_MQ_DETACHED if the worker has already detached or
 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
 * Note that we will only be able to detect that the worker has died before
 * attaching if a background worker handle was passed to shm_mq_attach().
 */
shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle *mqh)
{
	shm_mq	   *mq = mqh->mqh_queue;
	PGPROC	  **victim;
	if (shm_mq_get_receiver(mq) == MyProc)
		victim = &mq->mq_sender;
	else
	{
		Assert(shm_mq_get_sender(mq) == MyProc);
		victim = &mq->mq_receiver;
	}
	if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
		return SHM_MQ_SUCCESS;
	else
		return SHM_MQ_DETACHED;
}
/*
 * Detach from a shared message queue, and destroy the shm_mq_handle.
 */
void
shm_mq_detach(shm_mq_handle *mqh)
{
	/* Notify counterparty that we're outta here. */
	shm_mq_detach_internal(mqh->mqh_queue);
	/* Cancel on_dsm_detach callback, if any. */
	if (mqh->mqh_segment)
		cancel_on_dsm_detach(mqh->mqh_segment,
							 shm_mq_detach_callback,
							 PointerGetDatum(mqh->mqh_queue));
	/* Release local memory associated with handle. */
	if (mqh->mqh_buffer != NULL)
		pfree(mqh->mqh_buffer);
	pfree(mqh);
}
/*
 * Notify counterparty that we're detaching from shared message queue.
 *
 * The purpose of this function is to make sure that the process
 * with which we're communicating doesn't block forever waiting for us to
 * fill or drain the queue once we've lost interest.  When the sender
 * detaches, the receiver can read any messages remaining in the queue;
 * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
 *
 * This is separated out from shm_mq_detach() because if the on_dsm_detach
 * callback fires, we only want to do this much.  We do not try to touch
 * the local shm_mq_handle, as it may have been pfree'd already.
 */
static void
shm_mq_detach_internal(shm_mq *mq)
{
	PGPROC	   *victim;
	SpinLockAcquire(&mq->mq_mutex);
	if (mq->mq_sender == MyProc)
		victim = mq->mq_receiver;
	else
	{
		Assert(mq->mq_receiver == MyProc);
		victim = mq->mq_sender;
	}
	mq->mq_detached = true;
	SpinLockRelease(&mq->mq_mutex);
	if (victim != NULL)
		SetLatch(&victim->procLatch);
}
/*
 * Get the shm_mq from handle.
 */
shm_mq *
shm_mq_get_queue(shm_mq_handle *mqh)
{
	return mqh->mqh_queue;
}
/*
 * Write bytes into a shared message queue.
 */
static shm_mq_result
shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
				  bool nowait, Size *bytes_written)
{
	shm_mq	   *mq = mqh->mqh_queue;
	Size		sent = 0;
	uint64		used;
	Size		ringsize = mq->mq_ring_size;
	Size		available;
	while (sent < nbytes)
	{
		uint64		rb;
		uint64		wb;
		/* Compute number of ring buffer bytes used and available. */
		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
		Assert(wb >= rb);
		used = wb - rb;
		Assert(used <= ringsize);
		available = Min(ringsize - used, nbytes - sent);
		/*
		 * Bail out if the queue has been detached.  Note that we would be in
		 * trouble if the compiler decided to cache the value of
		 * mq->mq_detached in a register or on the stack across loop
		 * iterations.  It probably shouldn't do that anyway since we'll
		 * always return, call an external function that performs a system
		 * call, or reach a memory barrier at some point later in the loop,
		 * but just to be sure, insert a compiler barrier here.
		 */
		pg_compiler_barrier();
		if (mq->mq_detached)
		{
			*bytes_written = sent;
			return SHM_MQ_DETACHED;
		}
		if (available == 0 && !mqh->mqh_counterparty_attached)
		{
			/*
			 * The queue is full, so if the receiver isn't yet known to be
			 * attached, we must wait for that to happen.
			 */
			if (nowait)
			{
				if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
				{
					*bytes_written = sent;
					return SHM_MQ_DETACHED;
				}
				if (shm_mq_get_receiver(mq) == NULL)
				{
					*bytes_written = sent;
					return SHM_MQ_WOULD_BLOCK;
				}
			}
			else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
										   mqh->mqh_handle))
			{
				mq->mq_detached = true;
				*bytes_written = sent;
				return SHM_MQ_DETACHED;
			}
			mqh->mqh_counterparty_attached = true;
			/*
			 * The receiver may have read some data after attaching, so we
			 * must not wait without rechecking the queue state.
			 */
		}
		else if (available == 0)
		{
			/*
			 * Since mq->mqh_counterparty_attached is known to be true at this
			 * point, mq_receiver has been set, and it can't change once set.
			 * Therefore, we can read it without acquiring the spinlock.
			 */
			Assert(mqh->mqh_counterparty_attached);
			SetLatch(&mq->mq_receiver->procLatch);
			/* Skip manipulation of our latch if nowait = true. */
			if (nowait)
			{
				*bytes_written = sent;
				return SHM_MQ_WOULD_BLOCK;
			}
			/*
			 * Wait for our latch to be set.  It might already be set for some
			 * unrelated reason, but that'll just result in one extra trip
			 * through the loop.  It's worth it to avoid resetting the latch
			 * at top of loop, because setting an already-set latch is much
			 * cheaper than setting one that has been reset.
			 */
			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
			/* Reset the latch so we don't spin. */
			ResetLatch(MyLatch);
			/* An interrupt may have occurred while we were waiting. */
			CHECK_FOR_INTERRUPTS();
		}
		else
		{
			Size		offset;
			Size		sendnow;
			offset = wb % (uint64) ringsize;
			sendnow = Min(available, ringsize - offset);
			/*
			 * Write as much data as we can via a single memcpy(). Make sure
			 * these writes happen after the read of mq_bytes_read, above.
			 * This barrier pairs with the one in shm_mq_inc_bytes_read.
			 * (Since we're separating the read of mq_bytes_read from a
			 * subsequent write to mq_ring, we need a full barrier here.)
			 */
			pg_memory_barrier();
			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
				   (char *) data + sent, sendnow);
			sent += sendnow;
			/*
			 * Update count of bytes written, with alignment padding.  Note
			 * that this will never actually insert any padding except at the
			 * end of a run of bytes, because the buffer size is a multiple of
			 * MAXIMUM_ALIGNOF, and each read is as well.
			 */
			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
			/*
			 * For efficiency, we don't set the reader's latch here.  We'll do
			 * that only when the buffer fills up or after writing an entire
			 * message.
			 */
		}
	}
	*bytes_written = sent;
	return SHM_MQ_SUCCESS;
}
/*
 * Wait until at least *nbytesp bytes are available to be read from the
 * shared message queue, or until the buffer wraps around.  If the queue is
 * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
 * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
 * to the location at which data bytes can be read, *nbytesp is set to the
 * number of bytes which can be read at that address, and the return value
 * is SHM_MQ_SUCCESS.
 */
static shm_mq_result
shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
					 Size *nbytesp, void **datap)
{
	shm_mq	   *mq = mqh->mqh_queue;
	Size		ringsize = mq->mq_ring_size;
	uint64		used;
	uint64		written;
	for (;;)
	{
		Size		offset;
		uint64		read;
		/* Get bytes written, so we can compute what's available to read. */
		written = pg_atomic_read_u64(&mq->mq_bytes_written);
		/*
		 * Get bytes read.  Include bytes we could consume but have not yet
		 * consumed.
		 */
		read = pg_atomic_read_u64(&mq->mq_bytes_read) +
			mqh->mqh_consume_pending;
		used = written - read;
		Assert(used <= ringsize);
		offset = read % (uint64) ringsize;
		/* If we have enough data or buffer has wrapped, we're done. */
		if (used >= bytes_needed || offset + used >= ringsize)
		{
			*nbytesp = Min(used, ringsize - offset);
			*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
			/*
			 * Separate the read of mq_bytes_written, above, from caller's
			 * attempt to read the data itself.  Pairs with the barrier in
			 * shm_mq_inc_bytes_written.
			 */
			pg_read_barrier();
			return SHM_MQ_SUCCESS;
		}
		/*
		 * Fall out before waiting if the queue has been detached.
		 *
		 * Note that we don't check for this until *after* considering whether
		 * the data already available is enough, since the receiver can finish
		 * receiving a message stored in the buffer even after the sender has
		 * detached.
		 */
		if (mq->mq_detached)
		{
			/*
			 * If the writer advanced mq_bytes_written and then set
			 * mq_detached, we might not have read the final value of
			 * mq_bytes_written above.  Insert a read barrier and then check
			 * again if mq_bytes_written has advanced.
			 */
			pg_read_barrier();
			if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
				continue;
			return SHM_MQ_DETACHED;
		}
		/*
		 * We didn't get enough data to satisfy the request, so mark any data
		 * previously-consumed as read to make more buffer space.
		 */
		if (mqh->mqh_consume_pending > 0)
		{
			shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
			mqh->mqh_consume_pending = 0;
		}
		/* Skip manipulation of our latch if nowait = true. */
		if (nowait)
			return SHM_MQ_WOULD_BLOCK;
		/*
		 * Wait for our latch to be set.  It might already be set for some
		 * unrelated reason, but that'll just result in one extra trip through
		 * the loop.  It's worth it to avoid resetting the latch at top of
		 * loop, because setting an already-set latch is much cheaper than
		 * setting one that has been reset.
		 */
		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
		/* Reset the latch so we don't spin. */
		ResetLatch(MyLatch);
		/* An interrupt may have occurred while we were waiting. */
		CHECK_FOR_INTERRUPTS();
	}
}
/*
 * Test whether a counterparty who may not even be alive yet is definitely gone.
 */
static bool
shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
{
	pid_t		pid;
	/* If the queue has been detached, counterparty is definitely gone. */
	if (mq->mq_detached)
		return true;
	/* If there's a handle, check worker status. */
	if (handle != NULL)
	{
		BgwHandleStatus status;
		/* Check for unexpected worker death. */
		status = GetBackgroundWorkerPid(handle, &pid);
		if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
		{
			/* Mark it detached, just to make it official. */
			mq->mq_detached = true;
			return true;
		}
	}
	/* Counterparty is not definitively gone. */
	return false;
}
/*
 * This is used when a process is waiting for its counterpart to attach to the
 * queue.  We exit when the other process attaches as expected, or, if
 * handle != NULL, when the referenced background process or the postmaster
 * dies.  Note that if handle == NULL, and the process fails to attach, we'll
 * potentially get stuck here forever waiting for a process that may never
 * start.  We do check for interrupts, though.
 *
 * ptr is a pointer to the memory address that we're expecting to become
 * non-NULL when our counterpart attaches to the queue.
 */
static bool
shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
{
	bool		result = false;
	for (;;)
	{
		BgwHandleStatus status;
		pid_t		pid;
		/* Acquire the lock just long enough to check the pointer. */
		SpinLockAcquire(&mq->mq_mutex);
		result = (*ptr != NULL);
		SpinLockRelease(&mq->mq_mutex);
		/* Fail if detached; else succeed if initialized. */
		if (mq->mq_detached)
		{
			result = false;
			break;
		}
		if (result)
			break;
		if (handle != NULL)
		{
			/* Check for unexpected worker death. */
			status = GetBackgroundWorkerPid(handle, &pid);
			if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
			{
				result = false;
				break;
			}
		}
		/* Wait to be signalled. */
		WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
		/* Reset the latch so we don't spin. */
		ResetLatch(MyLatch);
		/* An interrupt may have occurred while we were waiting. */
		CHECK_FOR_INTERRUPTS();
	}
	return result;
}
/*
 * Increment the number of bytes read.
 */
static void
shm_mq_inc_bytes_read(shm_mq *mq, Size n)
{
	PGPROC	   *sender;
	/*
	 * Separate prior reads of mq_ring from the increment of mq_bytes_read
	 * which follows.  Pairs with the full barrier in shm_mq_send_bytes(). We
	 * only need a read barrier here because the increment of mq_bytes_read is
	 * actually a read followed by a dependent write.
	 */
	pg_read_barrier();
	/*
	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
	 * else can be changing this value.  This method should be cheaper.
	 */
	pg_atomic_write_u64(&mq->mq_bytes_read,
						pg_atomic_read_u64(&mq->mq_bytes_read) + n);
	/*
	 * We shouldn't have any bytes to read without a sender, so we can read
	 * mq_sender here without a lock.  Once it's initialized, it can't change.
	 */
	sender = mq->mq_sender;
	Assert(sender != NULL);
	SetLatch(&sender->procLatch);
}
/*
 * Increment the number of bytes written.
 */
static void
shm_mq_inc_bytes_written(shm_mq *mq, Size n)
{
	/*
	 * Separate prior reads of mq_ring from the write of mq_bytes_written
	 * which we're about to do.  Pairs with the read barrier found in
	 * shm_mq_get_receive_bytes.
	 */
	pg_write_barrier();
	/*
	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
	 * else can be changing this value.  This method avoids taking the bus
	 * lock unnecessarily.
	 */
	pg_atomic_write_u64(&mq->mq_bytes_written,
						pg_atomic_read_u64(&mq->mq_bytes_written) + n);
}
/* Shim for on_dsm_callback. */
static void
shm_mq_detach_callback(dsm_segment *seg, Datum arg)
{
	shm_mq	   *mq = (shm_mq *) DatumGetPointer(arg);
	shm_mq_detach_internal(mq);
}
 |