| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
 | /*-------------------------------------------------------------------------
 *
 * clog.c
 *		PostgreSQL transaction-commit-log manager
 *
 * This module replaces the old "pg_log" access code, which treated pg_log
 * essentially like a relation, in that it went through the regular buffer
 * manager.  The problem with that was that there wasn't any good way to
 * recycle storage space for transactions so old that they'll never be
 * looked up again.  Now we use specialized access code so that the commit
 * log can be broken into relatively small, independent segments.
 *
 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.11 2002/09/26 22:58:33 tgl Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/clog.h"
#include "storage/lwlock.h"
#include "miscadmin.h"
/*
 * Defines for CLOG page and segment sizes.  A page is the same BLCKSZ
 * as is used everywhere else in Postgres.	The CLOG segment size can be
 * chosen somewhat arbitrarily; we make it 1 million transactions by default,
 * or 256Kb.
 *
 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
 * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
 * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
 * take no explicit notice of that fact in this module, except when comparing
 * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
 */
#define CLOG_BLCKSZ			BLCKSZ
/* We need two bits per xact, so four xacts fit in a byte */
#define CLOG_BITS_PER_XACT	2
#define CLOG_XACTS_PER_BYTE 4
#define CLOG_XACTS_PER_PAGE (CLOG_BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK	((1 << CLOG_BITS_PER_XACT) - 1)
#define CLOG_XACTS_PER_SEGMENT	0x100000
#define CLOG_PAGES_PER_SEGMENT	(CLOG_XACTS_PER_SEGMENT / CLOG_XACTS_PER_PAGE)
#define TransactionIdToPage(xid)	((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToByte(xid)	(TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
#define TransactionIdToBIndex(xid)	((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
/*----------
 * Shared-memory data structures for CLOG control
 *
 * We use a simple least-recently-used scheme to manage a pool of page
 * buffers for the CLOG.  Under ordinary circumstances we expect that write
 * traffic will occur mostly to the latest CLOG page (and to the just-prior
 * page, soon after a page transition).  Read traffic will probably touch
 * a larger span of pages, but in any case a fairly small number of page
 * buffers should be sufficient.  So, we just search the buffers using plain
 * linear search; there's no need for a hashtable or anything fancy.
 * The management algorithm is straight LRU except that we will never swap
 * out the latest page (since we know it's going to be hit again eventually).
 *
 * We use an overall LWLock to protect the shared data structures, plus
 * per-buffer LWLocks that synchronize I/O for each buffer.  A process
 * that is reading in or writing out a page buffer does not hold the control
 * lock, only the per-buffer lock for the buffer it is working on.
 *
 * To change the page number or state of a buffer, one must normally hold
 * the control lock.  (The sole exception to this rule is that a writer
 * process changes the state from DIRTY to WRITE_IN_PROGRESS while holding
 * only the per-buffer lock.)  If the buffer's state is neither EMPTY nor
 * CLEAN, then there may be processes doing (or waiting to do) I/O on the
 * buffer, so the page number may not be changed, and the only allowed state
 * transition is to change WRITE_IN_PROGRESS to DIRTY after dirtying the page.
 * To do any other state transition involving a buffer with potential I/O
 * processes, one must hold both the per-buffer lock and the control lock.
 * (Note the control lock must be acquired second; do not wait on a buffer
 * lock while holding the control lock.)  A process wishing to read a page
 * marks the buffer state as READ_IN_PROGRESS, then drops the control lock,
 * acquires the per-buffer lock, and rechecks the state before proceeding.
 * This recheck takes care of the possibility that someone else already did
 * the read, while the early marking prevents someone else from trying to
 * read the same page into a different buffer.
 *
 * Note we are assuming that read and write of the state value is atomic,
 * since I/O processes may examine and change the state while not holding
 * the control lock.
 *
 * As with the regular buffer manager, it is possible for another process
 * to re-dirty a page that is currently being written out.	This is handled
 * by setting the page's state from WRITE_IN_PROGRESS to DIRTY.  The writing
 * process must notice this and not mark the page CLEAN when it's done.
 *
 * XLOG interactions: this module generates an XLOG record whenever a new
 * CLOG page is initialized to zeroes.	Other writes of CLOG come from
 * recording of transaction commit or abort in xact.c, which generates its
 * own XLOG records for these events and will re-perform the status update
 * on redo; so we need make no additional XLOG entry here.	Also, the XLOG
 * is guaranteed flushed through the XLOG commit record before we are called
 * to log a commit, so the WAL rule "write xlog before data" is satisfied
 * automatically for commits, and we don't really care for aborts.  Therefore,
 * we don't need to mark XLOG pages with LSN information; we have enough
 * synchronization already.
 *----------
 */
typedef enum
{
	CLOG_PAGE_EMPTY,			/* CLOG buffer is not in use */
	CLOG_PAGE_READ_IN_PROGRESS, /* CLOG page is being read in */
	CLOG_PAGE_CLEAN,			/* CLOG page is valid and not dirty */
	CLOG_PAGE_DIRTY,			/* CLOG page is valid but needs write */
	CLOG_PAGE_WRITE_IN_PROGRESS /* CLOG page is being written out in */
} ClogPageStatus;
/*
 * Shared-memory state for CLOG.
 */
typedef struct ClogCtlData
{
	/*
	 * Info for each buffer slot.  Page number is undefined when status is
	 * EMPTY.  lru_count is essentially the number of operations since
	 * last use of this page; the page with highest lru_count is the best
	 * candidate to replace.
	 */
	char	   *page_buffer[NUM_CLOG_BUFFERS];
	ClogPageStatus page_status[NUM_CLOG_BUFFERS];
	int			page_number[NUM_CLOG_BUFFERS];
	unsigned int page_lru_count[NUM_CLOG_BUFFERS];
	/*
	 * latest_page_number is the page number of the current end of the
	 * CLOG; this is not critical data, since we use it only to avoid
	 * swapping out the latest page.
	 */
	int			latest_page_number;
} ClogCtlData;
static ClogCtlData *ClogCtl = NULL;
/*
 * ClogBufferLocks is set during CLOGShmemInit and does not change thereafter.
 * The value is automatically inherited by backends via fork, and
 * doesn't need to be in shared memory.
 */
static LWLockId ClogBufferLocks[NUM_CLOG_BUFFERS];		/* Per-buffer I/O locks */
/*
 * ClogDir is set during CLOGShmemInit and does not change thereafter.
 * The value is automatically inherited by backends via fork, and
 * doesn't need to be in shared memory.
 */
static char ClogDir[MAXPGPATH];
#define ClogFileName(path, seg) \
	snprintf(path, MAXPGPATH, "%s/%04X", ClogDir, seg)
/*
 * Macro to mark a buffer slot "most recently used".
 */
#define ClogRecentlyUsed(slotno)	\
	do { \
		int		iilru; \
		for (iilru = 0; iilru < NUM_CLOG_BUFFERS; iilru++) \
			ClogCtl->page_lru_count[iilru]++; \
		ClogCtl->page_lru_count[slotno] = 0; \
	} while (0)
static int	ZeroCLOGPage(int pageno, bool writeXlog);
static int	ReadCLOGPage(int pageno);
static void WriteCLOGPage(int slotno);
static void CLOGPhysicalReadPage(int pageno, int slotno);
static void CLOGPhysicalWritePage(int pageno, int slotno);
static int	SelectLRUCLOGPage(int pageno);
static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions);
static bool CLOGPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno);
/*
 * Record the final state of a transaction in the commit log.
 *
 * NB: this is a low-level routine and is NOT the preferred entry point
 * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
 */
void
TransactionIdSetStatus(TransactionId xid, XidStatus status)
{
	int			pageno = TransactionIdToPage(xid);
	int			byteno = TransactionIdToByte(xid);
	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
	int			slotno;
	char	   *byteptr;
	Assert(status == TRANSACTION_STATUS_COMMITTED ||
		   status == TRANSACTION_STATUS_ABORTED);
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	slotno = ReadCLOGPage(pageno);
	byteptr = ClogCtl->page_buffer[slotno] + byteno;
	/* Current state should be 0 or target state */
	Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
		   ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);
	*byteptr |= (status << bshift);
	ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
	LWLockRelease(CLogControlLock);
}
/*
 * Interrogate the state of a transaction in the commit log.
 *
 * NB: this is a low-level routine and is NOT the preferred entry point
 * for most uses; TransactionLogTest() in transam.c is the intended caller.
 */
XidStatus
TransactionIdGetStatus(TransactionId xid)
{
	int			pageno = TransactionIdToPage(xid);
	int			byteno = TransactionIdToByte(xid);
	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
	int			slotno;
	char	   *byteptr;
	XidStatus	status;
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	slotno = ReadCLOGPage(pageno);
	byteptr = ClogCtl->page_buffer[slotno] + byteno;
	status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
	LWLockRelease(CLogControlLock);
	return status;
}
/*
 * Initialization of shared memory for CLOG
 */
int
CLOGShmemSize(void)
{
	return MAXALIGN(sizeof(ClogCtlData) + CLOG_BLCKSZ * NUM_CLOG_BUFFERS);
}
void
CLOGShmemInit(void)
{
	bool		found;
	char	   *bufptr;
	int			slotno;
	/* this must agree with space requested by CLOGShmemSize() */
	ClogCtl = (ClogCtlData *)
		ShmemInitStruct("CLOG Ctl",
						MAXALIGN(sizeof(ClogCtlData) +
								 CLOG_BLCKSZ * NUM_CLOG_BUFFERS),
						&found);
	Assert(!found);
	memset(ClogCtl, 0, sizeof(ClogCtlData));
	bufptr = ((char *) ClogCtl) + sizeof(ClogCtlData);
	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		ClogCtl->page_buffer[slotno] = bufptr;
		ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
		ClogBufferLocks[slotno] = LWLockAssign();
		bufptr += CLOG_BLCKSZ;
	}
	/* ClogCtl->latest_page_number will be set later */
	/* Init CLOG directory path */
	snprintf(ClogDir, MAXPGPATH, "%s/pg_clog", DataDir);
}
/*
 * This func must be called ONCE on system install.  It creates
 * the initial CLOG segment.  (The CLOG directory is assumed to
 * have been created by the initdb shell script, and CLOGShmemInit
 * must have been called already.)
 */
void
BootStrapCLOG(void)
{
	int			slotno;
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	/* Create and zero the first page of the commit log */
	slotno = ZeroCLOGPage(0, false);
	/* Make sure it's written out */
	WriteCLOGPage(slotno);
	Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
	LWLockRelease(CLogControlLock);
}
/*
 * Initialize (or reinitialize) a page of CLOG to zeroes.
 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
 *
 * The page is not actually written, just set up in shared memory.
 * The slot number of the new page is returned.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ZeroCLOGPage(int pageno, bool writeXlog)
{
	int			slotno;
	/* Find a suitable buffer slot for the page */
	slotno = SelectLRUCLOGPage(pageno);
	Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
		   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN ||
		   ClogCtl->page_number[slotno] == pageno);
	/* Mark the slot as containing this page */
	ClogCtl->page_number[slotno] = pageno;
	ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
	ClogRecentlyUsed(slotno);
	/* Set the buffer to zeroes */
	MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
	/* Assume this page is now the latest active page */
	ClogCtl->latest_page_number = pageno;
	if (writeXlog)
		WriteZeroPageXlogRec(pageno);
	return slotno;
}
/*
 * Find a CLOG page in a shared buffer, reading it in if necessary.
 * The page number must correspond to an already-initialized page.
 *
 * Return value is the shared-buffer slot number now holding the page.
 * The buffer's LRU access info is updated.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ReadCLOGPage(int pageno)
{
	/* Outer loop handles restart if we lose the buffer to someone else */
	for (;;)
	{
		int			slotno;
		/* See if page already is in memory; if not, pick victim slot */
		slotno = SelectLRUCLOGPage(pageno);
		/* Did we find the page in memory? */
		if (ClogCtl->page_number[slotno] == pageno &&
			ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
		{
			/* If page is still being read in, we cannot use it yet */
			if (ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
			{
				/* otherwise, it's ready to use */
				ClogRecentlyUsed(slotno);
				return slotno;
			}
		}
		else
		{
			/* We found no match; assert we selected a freeable slot */
			Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
				   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
		}
		/* Mark the slot read-busy (no-op if it already was) */
		ClogCtl->page_number[slotno] = pageno;
		ClogCtl->page_status[slotno] = CLOG_PAGE_READ_IN_PROGRESS;
		/*
		 * Temporarily mark page as recently-used to discourage
		 * SelectLRUCLOGPage from selecting it again for someone else.
		 */
		ClogCtl->page_lru_count[slotno] = 0;
		/* Release shared lock, grab per-buffer lock instead */
		LWLockRelease(CLogControlLock);
		LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
		/*
		 * Check to see if someone else already did the read, or took the
		 * buffer away from us.  If so, restart from the top.
		 */
		if (ClogCtl->page_number[slotno] != pageno ||
			ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
		{
			LWLockRelease(ClogBufferLocks[slotno]);
			LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
			continue;
		}
		/* Okay, do the read */
		CLOGPhysicalReadPage(pageno, slotno);
		/* Re-acquire shared control lock and update page state */
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
		Assert(ClogCtl->page_number[slotno] == pageno &&
			 ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS);
		ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
		LWLockRelease(ClogBufferLocks[slotno]);
		ClogRecentlyUsed(slotno);
		return slotno;
	}
}
/*
 * Write a CLOG page from a shared buffer, if necessary.
 * Does nothing if the specified slot is not dirty.
 *
 * NOTE: only one write attempt is made here.  Hence, it is possible that
 * the page is still dirty at exit (if someone else re-dirtied it during
 * the write).	However, we *do* attempt a fresh write even if the page
 * is already being written; this is for checkpoints.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static void
WriteCLOGPage(int slotno)
{
	int			pageno;
	/* Do nothing if page does not need writing */
	if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
		ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS)
		return;
	pageno = ClogCtl->page_number[slotno];
	/* Release shared lock, grab per-buffer lock instead */
	LWLockRelease(CLogControlLock);
	LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
	/*
	 * Check to see if someone else already did the write, or took the
	 * buffer away from us.  If so, do nothing.  NOTE: we really should
	 * never see WRITE_IN_PROGRESS here, since that state should only
	 * occur while the writer is holding the buffer lock.  But accept it
	 * so that we have a recovery path if a writer aborts.
	 */
	if (ClogCtl->page_number[slotno] != pageno ||
		(ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
		 ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS))
	{
		LWLockRelease(ClogBufferLocks[slotno]);
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
		return;
	}
	/*
	 * Mark the slot write-busy.  After this point, a transaction status
	 * update on this page will mark it dirty again.  NB: we are assuming
	 * that read/write of the page status field is atomic, since we change
	 * the state while not holding control lock.  However, we cannot set
	 * this state any sooner, or we'd possibly fool a previous writer into
	 * thinking he's successfully dumped the page when he hasn't.
	 * (Scenario: other writer starts, page is redirtied, we come along
	 * and set WRITE_IN_PROGRESS again, other writer completes and sets
	 * CLEAN because redirty info has been lost, then we think it's clean
	 * too.)
	 */
	ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS;
	/* Okay, do the write */
	CLOGPhysicalWritePage(pageno, slotno);
	/* Re-acquire shared control lock and update page state */
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	Assert(ClogCtl->page_number[slotno] == pageno &&
		   (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS ||
			ClogCtl->page_status[slotno] == CLOG_PAGE_DIRTY));
	/* Cannot set CLEAN if someone re-dirtied page since write started */
	if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS)
		ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
	LWLockRelease(ClogBufferLocks[slotno]);
}
/*
 * Physical read of a (previously existing) page into a buffer slot
 *
 * For now, assume it's not worth keeping a file pointer open across
 * read/write operations.  We could cache one virtual file pointer ...
 */
static void
CLOGPhysicalReadPage(int pageno, int slotno)
{
	int			segno = pageno / CLOG_PAGES_PER_SEGMENT;
	int			rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
	int			offset = rpageno * CLOG_BLCKSZ;
	char		path[MAXPGPATH];
	int			fd;
	ClogFileName(path, segno);
	/*
	 * In a crash-and-restart situation, it's possible for us to receive
	 * commands to set the commit status of transactions whose bits are in
	 * already-truncated segments of the commit log (see notes in
	 * CLOGPhysicalWritePage).	Hence, if we are InRecovery, allow the
	 * case where the file doesn't exist, and return zeroes instead.
	 */
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
	{
		if (errno != ENOENT || !InRecovery)
			elog(PANIC, "open of %s failed: %m", path);
		elog(LOG, "clog file %s doesn't exist, reading as zeroes", path);
		MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
		return;
	}
	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
		elog(PANIC, "lseek of clog file %u, offset %u failed: %m",
			 segno, offset);
	errno = 0;
	if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
		elog(PANIC, "read of clog file %u, offset %u failed: %m",
			 segno, offset);
	close(fd);
}
/*
 * Physical write of a page from a buffer slot
 *
 * For now, assume it's not worth keeping a file pointer open across
 * read/write operations.  We could cache one virtual file pointer ...
 */
static void
CLOGPhysicalWritePage(int pageno, int slotno)
{
	int			segno = pageno / CLOG_PAGES_PER_SEGMENT;
	int			rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
	int			offset = rpageno * CLOG_BLCKSZ;
	char		path[MAXPGPATH];
	int			fd;
	ClogFileName(path, segno);
	/*
	 * If the file doesn't already exist, we should create it.  It is
	 * possible for this to need to happen when writing a page that's not
	 * first in its segment; we assume the OS can cope with that.  (Note:
	 * it might seem that it'd be okay to create files only when
	 * ZeroCLOGPage is called for the first page of a segment.	However,
	 * if after a crash and restart the REDO logic elects to replay the
	 * log from a checkpoint before the latest one, then it's possible
	 * that we will get commands to set transaction status of transactions
	 * that have already been truncated from the commit log.  Easiest way
	 * to deal with that is to accept references to nonexistent files here
	 * and in CLOGPhysicalReadPage.)
	 */
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
	{
		if (errno != ENOENT)
			elog(PANIC, "open of %s failed: %m", path);
		fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
						   S_IRUSR | S_IWUSR);
		if (fd < 0)
			elog(PANIC, "creation of file %s failed: %m", path);
	}
	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
		elog(PANIC, "lseek of clog file %u, offset %u failed: %m",
			 segno, offset);
	errno = 0;
	if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
	{
		/* if write didn't set errno, assume problem is no disk space */
		if (errno == 0)
			errno = ENOSPC;
		elog(PANIC, "write of clog file %u, offset %u failed: %m",
			 segno, offset);
	}
	close(fd);
}
/*
 * Select the slot to re-use when we need a free slot.
 *
 * The target page number is passed because we need to consider the
 * possibility that some other process reads in the target page while
 * we are doing I/O to free a slot.  Hence, check or recheck to see if
 * any slot already holds the target page, and return that slot if so.
 * Thus, the returned slot is *either* a slot already holding the pageno
 * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
 * or CLEAN).
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
SelectLRUCLOGPage(int pageno)
{
	/* Outer loop handles restart after I/O */
	for (;;)
	{
		int			slotno;
		int			bestslot = 0;
		unsigned int bestcount = 0;
		/* See if page already has a buffer assigned */
		for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
		{
			if (ClogCtl->page_number[slotno] == pageno &&
				ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
				return slotno;
		}
		/*
		 * If we find any EMPTY slot, just select that one. Else locate
		 * the least-recently-used slot that isn't the latest CLOG page.
		 */
		for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
		{
			if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
				return slotno;
			if (ClogCtl->page_lru_count[slotno] > bestcount &&
			 ClogCtl->page_number[slotno] != ClogCtl->latest_page_number)
			{
				bestslot = slotno;
				bestcount = ClogCtl->page_lru_count[slotno];
			}
		}
		/*
		 * If the selected page is clean, we're set.
		 */
		if (ClogCtl->page_status[bestslot] == CLOG_PAGE_CLEAN)
			return bestslot;
		/*
		 * We need to do I/O.  Normal case is that we have to write it
		 * out, but it's possible in the worst case to have selected a
		 * read-busy page.	In that case we use ReadCLOGPage to wait for
		 * the read to complete.
		 */
		if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS)
			(void) ReadCLOGPage(ClogCtl->page_number[bestslot]);
		else
			WriteCLOGPage(bestslot);
		/*
		 * Now loop back and try again.  This is the easiest way of
		 * dealing with corner cases such as the victim page being
		 * re-dirtied while we wrote it.
		 */
	}
}
/*
 * This must be called ONCE during postmaster or standalone-backend startup,
 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
 */
void
StartupCLOG(void)
{
	/*
	 * Initialize our idea of the latest page number.
	 */
	ClogCtl->latest_page_number = TransactionIdToPage(ShmemVariableCache->nextXid);
}
/*
 * This must be called ONCE during postmaster or standalone-backend shutdown
 */
void
ShutdownCLOG(void)
{
	int			slotno;
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		WriteCLOGPage(slotno);
		Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
			   ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
	}
	LWLockRelease(CLogControlLock);
}
/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
void
CheckPointCLOG(void)
{
	int			slotno;
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		WriteCLOGPage(slotno);
		/*
		 * We cannot assert that the slot is clean now, since another
		 * process might have re-dirtied it already.  That's okay.
		 */
	}
	LWLockRelease(CLogControlLock);
}
/*
 * Make sure that CLOG has room for a newly-allocated XID.
 *
 * NB: this is called while holding XidGenLock.  We want it to be very fast
 * most of the time; even when it's not so fast, no actual I/O need happen
 * unless we're forced to write out a dirty clog or xlog page to make room
 * in shared memory.
 */
void
ExtendCLOG(TransactionId newestXact)
{
	int			pageno;
	/*
	 * No work except at first XID of a page.  But beware: just after
	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
	 */
	if (TransactionIdToPgIndex(newestXact) != 0 &&
		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
		return;
	pageno = TransactionIdToPage(newestXact);
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
	/* Zero the page and make an XLOG entry about it */
	ZeroCLOGPage(pageno, true);
	LWLockRelease(CLogControlLock);
}
/*
 * Remove all CLOG segments before the one holding the passed transaction ID
 *
 * When this is called, we know that the database logically contains no
 * reference to transaction IDs older than oldestXact.	However, we must
 * not truncate the CLOG until we have performed a checkpoint, to ensure
 * that no such references remain on disk either; else a crash just after
 * the truncation might leave us with a problem.  Since CLOG segments hold
 * a large number of transactions, the opportunity to actually remove a
 * segment is fairly rare, and so it seems best not to do the checkpoint
 * unless we have confirmed that there is a removable segment.	Therefore
 * we issue the checkpoint command here, not in higher-level code as might
 * seem cleaner.
 */
void
TruncateCLOG(TransactionId oldestXact)
{
	int			cutoffPage;
	int			slotno;
	/*
	 * The cutoff point is the start of the segment containing oldestXact.
	 */
	oldestXact -= oldestXact % CLOG_XACTS_PER_SEGMENT;
	cutoffPage = TransactionIdToPage(oldestXact);
	if (!ScanCLOGDirectory(cutoffPage, false))
		return;					/* nothing to remove */
	/* Perform a forced CHECKPOINT */
	CreateCheckPoint(false, true);
	/*
	 * Scan CLOG shared memory and remove any pages preceding the cutoff
	 * page, to ensure we won't rewrite them later.  (Any dirty pages
	 * should have been flushed already during the checkpoint, we're just
	 * being extra careful here.)
	 */
	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
restart:;
	/*
	 * While we are holding the lock, make an important safety check: the
	 * planned cutoff point must be <= the current CLOG endpoint page.
	 * Otherwise we have already wrapped around, and proceeding with the
	 * truncation would risk removing the current CLOG segment.
	 */
	if (CLOGPagePrecedes(ClogCtl->latest_page_number, cutoffPage))
	{
		LWLockRelease(CLogControlLock);
		elog(LOG, "unable to truncate commit log: apparent wraparound");
		return;
	}
	for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
	{
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
			continue;
		if (!CLOGPagePrecedes(ClogCtl->page_number[slotno], cutoffPage))
			continue;
		/*
		 * If page is CLEAN, just change state to EMPTY (expected case).
		 */
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN)
		{
			ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
			continue;
		}
		/*
		 * Hmm, we have (or may have) I/O operations acting on the page,
		 * so we've got to wait for them to finish and then start again.
		 * This is the same logic as in SelectLRUCLOGPage.
		 */
		if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS)
			(void) ReadCLOGPage(ClogCtl->page_number[slotno]);
		else
			WriteCLOGPage(slotno);
		goto restart;
	}
	LWLockRelease(CLogControlLock);
	/* Now we can remove the old CLOG segment(s) */
	(void) ScanCLOGDirectory(cutoffPage, true);
}
/*
 * TruncateCLOG subroutine: scan CLOG directory for removable segments.
 * Actually remove them iff doDeletions is true.  Return TRUE iff any
 * removable segments were found.  Note: no locking is needed.
 */
static bool
ScanCLOGDirectory(int cutoffPage, bool doDeletions)
{
	bool		found = false;
	DIR		   *cldir;
	struct dirent *clde;
	int			segno;
	int			segpage;
	char		path[MAXPGPATH];
	cldir = opendir(ClogDir);
	if (cldir == NULL)
		elog(PANIC, "could not open transaction-commit log directory (%s): %m",
			 ClogDir);
	errno = 0;
	while ((clde = readdir(cldir)) != NULL)
	{
		if (strlen(clde->d_name) == 4 &&
			strspn(clde->d_name, "0123456789ABCDEF") == 4)
		{
			segno = (int) strtol(clde->d_name, NULL, 16);
			segpage = segno * CLOG_PAGES_PER_SEGMENT;
			if (CLOGPagePrecedes(segpage, cutoffPage))
			{
				found = true;
				if (doDeletions)
				{
					elog(LOG, "removing commit log file %s", clde->d_name);
					snprintf(path, MAXPGPATH, "%s/%s", ClogDir, clde->d_name);
					unlink(path);
				}
			}
		}
		errno = 0;
	}
	if (errno)
		elog(PANIC, "could not read transaction-commit log directory (%s): %m",
			 ClogDir);
	closedir(cldir);
	return found;
}
/*
 * Decide which of two CLOG page numbers is "older" for truncation purposes.
 *
 * We need to use comparison of TransactionIds here in order to do the right
 * thing with wraparound XID arithmetic.  However, if we are asked about
 * page number zero, we don't want to hand InvalidTransactionId to
 * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
 * offset both xids by FirstNormalTransactionId to avoid that.
 */
static bool
CLOGPagePrecedes(int page1, int page2)
{
	TransactionId xid1;
	TransactionId xid2;
	xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
	xid1 += FirstNormalTransactionId;
	xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
	xid2 += FirstNormalTransactionId;
	return TransactionIdPrecedes(xid1, xid2);
}
/*
 * Write a ZEROPAGE xlog record
 *
 * Note: xlog record is marked as outside transaction control, since we
 * want it to be redone whether the invoking transaction commits or not.
 * (Besides which, this is normally done just before entering a transaction.)
 */
static void
WriteZeroPageXlogRec(int pageno)
{
	XLogRecData rdata;
	rdata.buffer = InvalidBuffer;
	rdata.data = (char *) (&pageno);
	rdata.len = sizeof(int);
	rdata.next = NULL;
	(void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
}
/*
 * CLOG resource manager's routines
 */
void
clog_redo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	if (info == CLOG_ZEROPAGE)
	{
		int			pageno;
		int			slotno;
		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
		LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
		slotno = ZeroCLOGPage(pageno, false);
		WriteCLOGPage(slotno);
		Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
		LWLockRelease(CLogControlLock);
	}
}
void
clog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
clog_desc(char *buf, uint8 xl_info, char *rec)
{
	uint8		info = xl_info & ~XLR_INFO_MASK;
	if (info == CLOG_ZEROPAGE)
	{
		int			pageno;
		memcpy(&pageno, rec, sizeof(int));
		sprintf(buf + strlen(buf), "zeropage: %d", pageno);
	}
	else
		strcat(buf, "UNKNOWN");
}
 |