summaryrefslogtreecommitdiff
path: root/src/include/access
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-08-08 07:34:39 +0530
committerAmit Kapila <akapila@postgresql.org>2020-08-08 07:47:06 +0530
commit7259736a6e5b7c7588fff9578370736a6648acbb (patch)
treea2261d4ed09124a00d9ed8c0082f22256364aa77 /src/include/access
parent0a7d771f0f63eb120e7f0a60aecd543ab25ba197 (diff)
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc9738b. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9af48 and c55040ccd0 respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/include/access')
-rw-r--r--src/include/access/heapam_xlog.h1
-rw-r--r--src/include/access/tableam.h55
-rw-r--r--src/include/access/xact.h4
3 files changed, 60 insertions, 0 deletions
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 95d18cdb12e..aa17f7df84d 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -67,6 +67,7 @@
#define XLH_INSERT_LAST_IN_MULTI (1<<1)
#define XLH_INSERT_IS_SPECULATIVE (1<<2)
#define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3)
+#define XLH_INSERT_ON_TOAST_RELATION (1<<4)
/*
* xl_heap_update flag values, 8 bits are available.
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 7ba72c84e02..387eb34a61a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -19,6 +19,7 @@
#include "access/relscan.h"
#include "access/sdir.h"
+#include "access/xact.h"
#include "utils/guc.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
@@ -903,6 +904,15 @@ static inline bool
table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
+
+ /*
+ * We don't expect direct calls to table_scan_getnextslot with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding");
+
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}
@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
TupleTableSlot *slot,
bool *call_again, bool *all_dead)
{
+ /*
+ * We don't expect direct calls to table_index_fetch_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding");
return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
slot, call_again,
@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel,
Snapshot snapshot,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_tuple_fetch_row_version with
+ * valid CheckXidAlive for catalog or regular tables. See detailed
+ * comments in xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding");
+
return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
}
@@ -1713,6 +1738,14 @@ static inline bool
table_scan_bitmap_next_block(TableScanDesc scan,
struct TBMIterateResult *tbmres)
{
+ /*
+ * We don't expect direct calls to table_scan_bitmap_next_block with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
+
return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
tbmres);
}
@@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan,
struct TBMIterateResult *tbmres,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_scan_bitmap_next_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding");
+
return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan,
tbmres,
slot);
@@ -1748,6 +1789,13 @@ static inline bool
table_scan_sample_next_block(TableScanDesc scan,
struct SampleScanState *scanstate)
{
+ /*
+ * We don't expect direct calls to table_scan_sample_next_block with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate);
}
@@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan,
struct SampleScanState *scanstate,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_scan_sample_next_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate,
slot);
}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 53480116a46..c18554bae2c 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -81,6 +81,10 @@ typedef enum
/* Synchronous commit level */
extern int synchronous_commit;
+/* used during logical streaming of a transaction */
+extern TransactionId CheckXidAlive;
+extern bool bsysscan;
+
/*
* Miscellaneous flag bits to record events which occur on the top level
* transaction. These flags are only persisted in MyXactFlags and are intended