summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2021-06-12 12:59:15 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2021-06-12 12:59:15 -0400
commitfe6a20ce54cbbb6fcfe9f6675d563af836ae799a (patch)
tree79aab32108476f8a1a4c5bb7883616be468722af
parentc3652f976b7696a96a9c5606cc2d613af77e2e63 (diff)
Don't use Asserts to check for violations of replication protocol.
Using an Assert to check the validity of incoming messages is an extremely poor decision. In a debug build, it should not be that easy for a broken or malicious remote client to crash the logrep worker. The consequences could be even worse in non-debug builds, which will fail to make such checks at all, leading to who-knows-what misbehavior. Hence, promote every Assert that could possibly be triggered by wrong or out-of-order replication messages to a full test-and-ereport. To avoid bloating the set of messages the translation team has to cope with, establish a policy that replication protocol violation error reports don't need to be translated. Hence, all the new messages here use errmsg_internal(). A couple of old messages are changed likewise for consistency. Along the way, fix some non-idiomatic or outright wrong uses of hash_search(). Most of these mistakes are new with the "streaming replication" patch (commit 464824323), but a couple go back a long way. Back-patch as appropriate. Discussion: https://postgr.es/m/1719083.1623351052@sss.pgh.pa.us
-rw-r--r--src/backend/replication/logical/reorderbuffer.c2
-rw-r--r--src/backend/replication/logical/worker.c118
2 files changed, 85 insertions, 35 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d9e1279bb2..f96029f15a4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1703,7 +1703,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
ent = (ReorderBufferTupleCidEnt *)
hash_search(txn->tuplecid_hash,
(void *) &key,
- HASH_ENTER | HASH_FIND,
+ HASH_ENTER,
&found);
if (!found)
{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 689a66cc72d..4b112593c65 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -177,7 +177,7 @@ bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/* fields valid only when processing streamed transaction */
-bool in_streamed_transaction = false;
+static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
@@ -345,7 +345,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
*/
xid = pq_getmsgint(s, 4);
- Assert(TransactionIdIsValid(xid));
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
/* Add the new subxact to the array (unless already there). */
subxact_info_add(xid);
@@ -785,7 +788,12 @@ apply_handle_commit(StringInfo s)
logicalrep_read_commit(s, &commit_data);
- Assert(commit_data.commit_lsn == remote_final_lsn);
+ if (commit_data.commit_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ LSN_FORMAT_ARGS(commit_data.commit_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
apply_handle_commit_internal(s, &commit_data);
@@ -812,7 +820,7 @@ apply_handle_origin(StringInfo s)
(IsTransactionState() && !am_tablesync_worker())))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("ORIGIN message sent out of order")));
+ errmsg_internal("ORIGIN message sent out of order")));
}
/*
@@ -824,7 +832,10 @@ apply_handle_stream_start(StringInfo s)
bool first_segment;
HASHCTL hash_ctl;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("duplicate STREAM START message")));
/*
* Start a transaction on stream start, this transaction will be committed
@@ -841,6 +852,11 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ if (!TransactionIdIsValid(stream_xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
+
/*
* Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent
@@ -873,7 +889,10 @@ apply_handle_stream_start(StringInfo s)
static void
apply_handle_stream_stop(StringInfo s)
{
- Assert(in_streamed_transaction);
+ if (!in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM STOP message without STREAM START")));
/*
* Close the file with serialized changes, and serialize information about
@@ -905,7 +924,10 @@ apply_handle_stream_abort(StringInfo s)
TransactionId xid;
TransactionId subxid;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM ABORT message without STREAM STOP")));
logicalrep_read_stream_abort(s, &xid, &subxid);
@@ -932,7 +954,6 @@ apply_handle_stream_abort(StringInfo s)
* performed rollback to savepoint for one of the earlier
* sub-transaction.
*/
-
int64 i;
int64 subidx;
BufFile *fd;
@@ -967,13 +988,15 @@ apply_handle_stream_abort(StringInfo s)
return;
}
- Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1006,13 +1029,15 @@ apply_handle_stream_commit(StringInfo s)
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- bool found;
LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
@@ -1031,11 +1056,17 @@ apply_handle_stream_commit(StringInfo s)
/* open the spool file for the committed transaction */
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
@@ -1080,7 +1111,9 @@ apply_handle_stream_commit(StringInfo s)
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
- Assert(len > 0);
+ if (len <= 0)
+ elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+ len, path);
/* make sure we have sufficiently large buffer */
buffer = repalloc(buffer, len);
@@ -1108,7 +1141,7 @@ apply_handle_stream_commit(StringInfo s)
nchanges++;
if (nchanges % 1000 == 0)
- elog(DEBUG1, "replayed %d changes from file '%s'",
+ elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}
@@ -2053,7 +2086,8 @@ apply_dispatch(StringInfo s)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid logical replication message type \"%c\"", action)));
+ errmsg_internal("invalid logical replication message type \"%c\"",
+ action)));
}
/*
@@ -2589,20 +2623,19 @@ static void
subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- bool found;
Size len;
StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
- /* find the xid entry in the xidhash */
+ /* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- /* we must found the entry for its top transaction by this time */
- Assert(found);
+ NULL);
+ /* By this time we must have created the transaction entry */
+ Assert(ent);
/*
* If there is no subtransaction then nothing to do, but if already have
@@ -2667,13 +2700,11 @@ static void
subxact_info_read(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- bool found;
Size len;
BufFile *fd;
StreamXidHash *ent;
MemoryContext oldctx;
- Assert(TransactionIdIsValid(xid));
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
@@ -2682,7 +2713,12 @@ subxact_info_read(Oid subid, TransactionId xid)
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/*
* If subxact_fileset is not valid that mean we don't have any subxact
@@ -2836,14 +2872,17 @@ stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
StreamXidHash *ent;
- bool found = false;
- /* By this time we must have created the transaction entry */
+ /* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/* Delete the change file and release the stream fileset memory */
changes_filename(path, subid, xid);
@@ -2893,9 +2932,9 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
/* create or find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
- HASH_ENTER | HASH_FIND,
+ HASH_ENTER,
&found);
- Assert(first_segment || found);
+
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -2915,6 +2954,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
MemoryContext savectx;
SharedFileSet *fileset;
+ if (found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
/*
* We need to maintain shared fileset across multiple stream
* start/stop calls. So, need to allocate it in a persistent context.
@@ -2934,6 +2978,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
}
else
{
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
@@ -3140,7 +3189,8 @@ ApplyWorkerMain(Datum main_arg)
*/
if (!myslotname)
ereport(ERROR,
- (errmsg("subscription has no replication slot set")));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription has no replication slot set")));
/* Setup replication origin tracking. */
StartTransactionCommand();