diff options
author | Magnus Hagander <magnus@hagander.net> | 2011-02-03 13:46:23 +0100 |
---|---|---|
committer | Magnus Hagander <magnus@hagander.net> | 2011-02-03 13:46:23 +0100 |
commit | 76129e7f14b4605db0a046e13abef0e255ffe007 (patch) | |
tree | 90c1ad0ea4f601454399d4b8f4ca3d1cc13035da /src | |
parent | f001cb38b67b0f2f5f4cfd1e32f86866da8c8693 (diff) |
Include more status information in walsender results
Add the current xlog insert location to the response of
IDENTIFY_SYSTEM, and adds result sets containing start
and stop location of backups to BASE_BACKUP responses.
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/basebackup.c | 39 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 27 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 70 |
3 files changed, 131 insertions, 5 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 29284a6ab5e..b5cda5063be 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -52,6 +52,7 @@ static void SendBackupHeader(List *tablespaces); static void base_backup_cleanup(int code, Datum arg); static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); +static void SendXlogRecPtrResult(XLogRecPtr ptr); /* * Size of each block sent into the tar stream for larger files. @@ -92,6 +93,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) char *labelfile; startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); + SendXlogRecPtrResult(startptr); PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); { @@ -239,6 +241,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send CopyDone message for the last tar file */ pq_putemptymessage('c'); } + SendXlogRecPtrResult(endptr); } /* @@ -432,6 +435,42 @@ SendBackupHeader(List *tablespaces) } /* + * Send a single resultset containing just a single + * XlogRecPtr record (in text format) + */ +static void +SendXlogRecPtrResult(XLogRecPtr ptr) +{ + StringInfoData buf; + char str[MAXFNAMELEN]; + + snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* Field header */ + pq_sendstring(&buf, "recptr"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + pq_endmessage(&buf); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* * Inject a file with given name and content in the output tar stream. */ static void diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f70458e01a2..78963c1e6be 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -258,19 +258,26 @@ IdentifySystem(void) StringInfoData buf; char sysid[32]; char tli[11]; + char xpos[MAXFNAMELEN]; + XLogRecPtr logptr; /* - * Reply with a result set with one row, two columns. First col is system - * ID, and second is timeline ID + * Reply with a result set with one row, three columns. First col is system + * ID, second is timeline ID, and third is current xlog location. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + logptr = GetInsertRecPtr(); + + snprintf(xpos, sizeof(xpos), "%X/%X", + logptr.xlogid, logptr.xrecoff); + /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 2, 2); /* 2 fields */ + pq_sendint(&buf, 3, 2); /* 3 fields */ /* first field */ pq_sendstring(&buf, "systemid"); /* col name */ @@ -289,15 +296,27 @@ IdentifySystem(void) pq_sendint(&buf, 4, 2); /* typlen */ pq_sendint(&buf, 0, 4); /* typmod */ pq_sendint(&buf, 0, 2); /* format code */ + + /* third field */ + pq_sendstring(&buf, "xlogpos"); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_sendint(&buf, TEXTOID, 4); + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, 3, 2); /* # of columns */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_sendint(&buf, strlen(xpos), 4); /* col3 len */ + pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); + pq_endmessage(&buf); /* Send CommandComplete and ReadyForQuery messages */ diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 6708fb7bf58..98414a99c65 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -742,15 +742,40 @@ static void BaseBackup() { PGresult *res; + uint32 timeline; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; + char xlogstart[64]; + char xlogend[64]; /* * Connect in replication mode to the server */ conn = GetConnection(); + /* + * Run IDENFITY_SYSTEM so we can get the timeline + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + timeline = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); + + /* + * Start the actual backup + */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s", escaped_label, @@ -766,7 +791,7 @@ BaseBackup() } /* - * Get the header + * Get the starting xlog position */ res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -775,6 +800,28 @@ BaseBackup() progname, PQerrorMessage(conn)); disconnect_and_exit(1); } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: no start point returned from server.\n"), + progname); + disconnect_and_exit(1); + } + strcpy(xlogstart, PQgetvalue(res, 0, 0)); + if (verbose && includewal) + fprintf(stderr, "xlog start point: %s\n", xlogstart); + PQclear(res); + MemSet(xlogend, 0, sizeof(xlogend)); + + /* + * Get the header + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not get backup header: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } if (PQntuples(res) < 1) { fprintf(stderr, _("%s: no data returned from server.\n"), progname); @@ -828,6 +875,27 @@ BaseBackup() } PQclear(res); + /* + * Get the stop position + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not get end xlog position from server.\n"), + progname); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: no end point returned from server.\n"), + progname); + disconnect_and_exit(1); + } + strcpy(xlogend, PQgetvalue(res, 0, 0)); + if (verbose && includewal) + fprintf(stderr, "xlog end point: %s\n", xlogend); + PQclear(res); + res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) { |