summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-04-14 13:58:46 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-04-14 13:58:46 -0400
commit887227a1cc861d87ca0f175cf8bd1447554090eb (patch)
treef0cc0f4315bd0489083742d4f7a3285fa749f2e9 /src/backend/commands/subscriptioncmds.c
parent25371a72b95aab43b0a3547ead4d3286c1128351 (diff)
Add option to modify sync commit per subscription
This also changes default behaviour of subscription workers to synchronous_commit = off. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c54
1 files changed, 45 insertions, 9 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7b8b11cb81f..519c6846e35 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -44,6 +44,7 @@
#include "storage/lmgr.h"
#include "utils/builtins.h"
+#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
bool *enabled, bool *create_slot, char **slot_name,
- bool *copy_data)
+ bool *copy_data, char **synchronous_commit)
{
ListCell *lc;
bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*slot_name = NULL;
if (copy_data)
*copy_data = true;
+ if (synchronous_commit)
+ *synchronous_commit = NULL;
/* Parse options */
foreach (lc, options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
copy_data_given = true;
*copy_data = !defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
+ synchronous_commit)
+ {
+ if (*synchronous_commit)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ *synchronous_commit = defGetString(defel);
+
+ /* Test if the given value is valid for synchronous_commit GUC. */
+ (void) set_config_option("synchronous_commit", *synchronous_commit,
+ PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+ false, 0, false);
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given;
bool enabled;
bool copy_data;
+ char *synchronous_commit;
char *conninfo;
char *slotname;
char originname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, &connect, &enabled_given,
- &enabled, &create_slot, &slotname, &copy_data);
+ &enabled, &create_slot, &slotname, &copy_data,
+ &synchronous_commit);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
if (slotname == NULL)
slotname = stmt->subname;
+ /* The default for synchronous_commit of subscriptions is off. */
+ if (synchronous_commit == NULL)
+ synchronous_commit = "off";
conninfo = stmt->conninfo;
publications = stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CStringGetTextDatum(conninfo);
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ values[Anum_pg_subscription_subsynccommit - 1] =
+ CStringGetTextDatum(synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_OPTIONS:
{
char *slot_name;
+ char *synchronous_commit;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, &slot_name, NULL);
+ NULL, &slot_name, NULL,
+ &synchronous_commit);
- values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slot_name));
- replaces[Anum_pg_subscription_subslotname - 1] = true;
+ if (slot_name)
+ {
+ values[Anum_pg_subscription_subslotname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+ replaces[Anum_pg_subscription_subslotname - 1] = true;
+ }
+ if (synchronous_commit)
+ {
+ values[Anum_pg_subscription_subsynccommit - 1] =
+ CStringGetTextDatum(synchronous_commit);
+ replaces[Anum_pg_subscription_subsynccommit - 1] = true;
+ }
update_tuple = true;
break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
parse_subscription_options(stmt->options, NULL,
&enabled_given, &enabled, NULL,
- NULL, NULL);
+ NULL, NULL, NULL);
Assert(enabled_given);
values[Anum_pg_subscription_subenabled - 1] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
Subscription *sub = GetSubscription(subid, false);
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data);
+ NULL, NULL, &copy_data, NULL);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
Subscription *sub = GetSubscription(subid, false);
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data);
+ NULL, NULL, &copy_data, NULL);
AlterSubscription_refresh(sub, copy_data);