summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c40
1 files changed, 25 insertions, 15 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c8464847d5f..1a45873277d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -102,7 +102,7 @@ typedef struct ReorderBufferIterTXNEntry
XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
- int fd;
+ File fd;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
@@ -183,7 +183,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
* subtransactions
* ---------------------------------------
*/
-static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state);
static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
@@ -200,7 +201,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno);
+ File *fd, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -1004,15 +1005,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
/*
* Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions.
+ *
+ * Note: The iterator state is returned through iter_state parameter rather
+ * than the function's return value. This is because the state gets cleaned up
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
+ * back the state even if this function throws an exception.
*/
-static ReorderBufferIterTXNState *
-ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static void
+ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state)
{
Size nr_txns = 0;
ReorderBufferIterTXNState *state;
dlist_iter cur_txn_i;
int32 off;
+ *iter_state = NULL;
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
@@ -1056,6 +1065,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferIterCompare,
state);
+ /* Now that the state fields are initialized, it is safe to return it. */
+ *iter_state = state;
+
/*
* Now insert items into the binary heap, in an unordered fashion. (We
* will run a heap assembly step at the end; this is more efficient.)
@@ -1118,8 +1130,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* assemble a valid binary heap */
binaryheap_build(state->heap);
-
- return state;
}
/*
@@ -1223,7 +1233,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
for (off = 0; off < state->nr_txns; off++)
{
if (state->entries[off].fd != -1)
- CloseTransientFile(state->entries[off].fd);
+ FileClose(state->entries[off].fd);
}
/* free memory we might have "leaked" in the last *Next call */
@@ -1561,7 +1571,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb->begin(rb, txn);
- iterstate = ReorderBufferIterTXNInit(rb, txn);
+ ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{
Relation relation = NULL;
@@ -2516,7 +2526,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno)
+ File *fd, XLogSegNo *segno)
{
Size restored = 0;
XLogSegNo last_segno;
@@ -2561,7 +2571,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
- *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+ *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
@@ -2582,12 +2592,12 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
- readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
+ readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
/* eof */
if (readBytes == 0)
{
- CloseTransientFile(*fd);
+ FileClose(*fd);
*fd = -1;
(*segno)++;
continue;
@@ -2609,8 +2619,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
- ondisk->size - sizeof(ReorderBufferDiskChange));
+ readBytes = FileRead(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
+ ondisk->size - sizeof(ReorderBufferDiskChange));
if (readBytes < 0)
ereport(ERROR,