summaryrefslogtreecommitdiff
path: root/src/backend/storage/file/buffile.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-08-26 07:36:43 +0530
committerAmit Kapila <akapila@postgresql.org>2020-08-26 07:36:43 +0530
commit808e13b282efa7e7ac7b78e886aca5684f4bccd3 (patch)
tree4a101d87e61d38297cc1cd30fac789cd3899c5e3 /src/backend/storage/file/buffile.c
parentadc8fc6167aa3f68b951ddd60ea32a62b13f18d6 (diff)
Extend the BufFile interface.
Allow BufFile to support temporary files that can be used by the single backend when the corresponding files need to be survived across the transaction and need to be opened and closed multiple times. Such files need to be created as a member of a SharedFileSet. Additionally, this commit implements the interface for BufFileTruncate to allow files to be truncated up to a particular offset and extends the BufFileSeek API to support the SEEK_END case. This also adds an option to provide a mode while opening the shared BufFiles instead of always opening in read-only mode. These enhancements in BufFile interface are required for the upcoming patch to allow the replication apply worker, to handle streamed in-progress transactions. Author: Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/backend/storage/file/buffile.c')
-rw-r--r--src/backend/storage/file/buffile.c129
1 files changed, 119 insertions, 10 deletions
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 2d7a0823208..d581f96eda9 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -32,10 +32,14 @@
* (by opening multiple fd.c temporary files). This is an essential feature
* for sorts and hashjoins on large amounts of data.
*
- * BufFile supports temporary files that can be made read-only and shared with
- * other backends, as infrastructure for parallel execution. Such files need
- * to be created as a member of a SharedFileSet that all participants are
- * attached to.
+ * BufFile supports temporary files that can be shared with other backends, as
+ * infrastructure for parallel execution. Such files need to be created as a
+ * member of a SharedFileSet that all participants are attached to.
+ *
+ * BufFile also supports temporary files that can be used by the single backend
+ * when the corresponding files need to be survived across the transaction and
+ * need to be opened and closed multiple times. Such files need to be created
+ * as a member of a SharedFileSet.
*-------------------------------------------------------------------------
*/
@@ -277,7 +281,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
* backends and render it read-only.
*/
BufFile *
-BufFileOpenShared(SharedFileSet *fileset, const char *name)
+BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
{
BufFile *file;
char segment_name[MAXPGPATH];
@@ -301,7 +305,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name)
}
/* Try to load a segment. */
SharedSegmentName(segment_name, name, nfiles);
- files[nfiles] = SharedFileSetOpen(fileset, segment_name);
+ files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode);
if (files[nfiles] <= 0)
break;
++nfiles;
@@ -321,7 +325,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name)
file = makeBufFileCommon(nfiles);
file->files = files;
- file->readOnly = true; /* Can't write to files opened this way */
+ file->readOnly = (mode == O_RDONLY) ? true : false;
file->fileset = fileset;
file->name = pstrdup(name);
@@ -666,11 +670,21 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
newFile = file->curFile;
newOffset = (file->curOffset + file->pos) + offset;
break;
-#ifdef NOT_USED
case SEEK_END:
- /* could be implemented, not needed currently */
+
+ /*
+ * The file size of the last file gives us the end offset of that
+ * file.
+ */
+ newFile = file->numFiles - 1;
+ newOffset = FileSize(file->files[file->numFiles - 1]);
+ if (newOffset < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m",
+ FilePathName(file->files[file->numFiles - 1]),
+ file->name)));
break;
-#endif
default:
elog(ERROR, "invalid whence: %d", whence);
return EOF;
@@ -838,3 +852,98 @@ BufFileAppend(BufFile *target, BufFile *source)
return startBlock;
}
+
+/*
+ * Truncate a BufFile created by BufFileCreateShared up to the given fileno and
+ * the offset.
+ */
+void
+BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
+{
+ int numFiles = file->numFiles;
+ int newFile = fileno;
+ off_t newOffset = file->curOffset;
+ char segment_name[MAXPGPATH];
+ int i;
+
+ /*
+ * Loop over all the files up to the given fileno and remove the files
+ * that are greater than the fileno and truncate the given file up to the
+ * offset. Note that we also remove the given fileno if the offset is 0
+ * provided it is not the first file in which we truncate it.
+ */
+ for (i = file->numFiles - 1; i >= fileno; i--)
+ {
+ if ((i != fileno || offset == 0) && i != 0)
+ {
+ SharedSegmentName(segment_name, file->name, i);
+ FileClose(file->files[i]);
+ if (!SharedFileSetDelete(file->fileset, segment_name, true))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not delete shared fileset \"%s\": %m",
+ segment_name)));
+ numFiles--;
+ newOffset = MAX_PHYSICAL_FILESIZE;
+
+ /*
+ * This is required to indicate that we have deleted the given
+ * fileno.
+ */
+ if (i == fileno)
+ newFile--;
+ }
+ else
+ {
+ if (FileTruncate(file->files[i], offset,
+ WAIT_EVENT_BUFFILE_TRUNCATE) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not truncate file \"%s\": %m",
+ FilePathName(file->files[i]))));
+ newOffset = offset;
+ }
+ }
+
+ file->numFiles = numFiles;
+
+ /*
+ * If the truncate point is within existing buffer then we can just adjust
+ * pos within buffer.
+ */
+ if (newFile == file->curFile &&
+ newOffset >= file->curOffset &&
+ newOffset <= file->curOffset + file->nbytes)
+ {
+ /* No need to reset the current pos if the new pos is greater. */
+ if (newOffset <= file->curOffset + file->pos)
+ file->pos = (int) (newOffset - file->curOffset);
+
+ /* Adjust the nbytes for the current buffer. */
+ file->nbytes = (int) (newOffset - file->curOffset);
+ }
+ else if (newFile == file->curFile &&
+ newOffset < file->curOffset)
+ {
+ /*
+ * The truncate point is within the existing file but prior to the
+ * current position, so we can forget the current buffer and reset the
+ * current position.
+ */
+ file->curOffset = newOffset;
+ file->pos = 0;
+ file->nbytes = 0;
+ }
+ else if (newFile < file->curFile)
+ {
+ /*
+ * The truncate point is prior to the current file, so need to reset
+ * the current position accordingly.
+ */
+ file->curFile = newFile;
+ file->curOffset = newOffset;
+ file->pos = 0;
+ file->nbytes = 0;
+ }
+ /* Nothing to do, if the truncate point is beyond current file. */
+}