summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c47
1 files changed, 28 insertions, 19 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index be441c7c7d6..6dcd97313f7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -103,7 +103,7 @@ typedef struct ReorderBufferIterTXNEntry
XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
- int fd;
+ File fd;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
@@ -181,7 +181,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
* subtransactions
* ---------------------------------------
*/
-static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state);
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state);
@@ -197,7 +198,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno);
+ File *fd, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -953,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
/*
* Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions.
+ *
+ * Note: The iterator state is returned through iter_state parameter rather
+ * than the function's return value. This is because the state gets cleaned up
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
+ * back the state even if this function throws an exception.
*/
-static ReorderBufferIterTXNState *
-ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static void
+ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state)
{
Size nr_txns = 0;
ReorderBufferIterTXNState *state;
dlist_iter cur_txn_i;
int32 off;
+ *iter_state = NULL;
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
@@ -1005,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferIterCompare,
state);
+ /* Now that the state fields are initialized, it is safe to return it. */
+ *iter_state = state;
+
/*
* Now insert items into the binary heap, in an unordered fashion. (We
* will run a heap assembly step at the end; this is more efficient.)
@@ -1067,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* assemble a valid binary heap */
binaryheap_build(state->heap);
-
- return state;
}
/*
@@ -1172,7 +1182,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
for (off = 0; off < state->nr_txns; off++)
{
if (state->entries[off].fd != -1)
- CloseTransientFile(state->entries[off].fd);
+ FileClose(state->entries[off].fd);
}
/* free memory we might have "leaked" in the last *Next call */
@@ -1508,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb->begin(rb, txn);
- iterstate = ReorderBufferIterTXNInit(rb, txn);
+ ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{
Relation relation = NULL;
@@ -2465,7 +2475,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno)
+ File *fd, XLogSegNo *segno)
{
Size restored = 0;
XLogSegNo last_segno;
@@ -2510,7 +2520,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
- *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+ *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
@@ -2531,14 +2541,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange),
+ WAIT_EVENT_REORDER_BUFFER_READ);
/* eof */
if (readBytes == 0)
{
- CloseTransientFile(*fd);
+ FileClose(*fd);
*fd = -1;
(*segno)++;
continue;
@@ -2560,10 +2569,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
- ondisk->size - sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(*fd,
+ rb->outbuf + sizeof(ReorderBufferDiskChange),
+ ondisk->size - sizeof(ReorderBufferDiskChange),
+ WAIT_EVENT_REORDER_BUFFER_READ);
if (readBytes < 0)
ereport(ERROR,