Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
P
Postgres FD Implementation
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Abuhujair Javed
Postgres FD Implementation
Commits
cada1af3
Commit
cada1af3
authored
Jan 17, 2017
by
Magnus Hagander
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add compression support to pg_receivexlog
Author: Michael Paquier, review and small changes by me
parent
974ece58
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
197 additions
and
20 deletions
+197
-20
doc/src/sgml/ref/pg_receivexlog.sgml
doc/src/sgml/ref/pg_receivexlog.sgml
+13
-0
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_basebackup.c
+1
-1
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/pg_receivexlog.c
+100
-4
src/bin/pg_basebackup/walmethods.c
src/bin/pg_basebackup/walmethods.c
+81
-14
src/bin/pg_basebackup/walmethods.h
src/bin/pg_basebackup/walmethods.h
+2
-1
No files found.
doc/src/sgml/ref/pg_receivexlog.sgml
View file @
cada1af3
...
...
@@ -180,6 +180,19 @@ PostgreSQL documentation
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
<term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
<listitem>
<para>
Enables gzip compression of transaction logs, and specifies the
compression level (0 through 9, 0 being no compression and 9 being best
compression). The suffix <filename>.gz</filename> will
automatically be added to all filenames.
</para>
</listitem>
</varlistentry>
</variablelist>
<para>
...
...
src/bin/pg_basebackup/pg_basebackup.c
View file @
cada1af3
...
...
@@ -494,7 +494,7 @@ LogStreamerMain(logstreamer_param *param)
stream
.
replication_slot
=
psprintf
(
"pg_basebackup_%d"
,
(
int
)
getpid
());
if
(
format
==
'p'
)
stream
.
walmethod
=
CreateWalDirectoryMethod
(
param
->
xlog
,
do_sync
);
stream
.
walmethod
=
CreateWalDirectoryMethod
(
param
->
xlog
,
0
,
do_sync
);
else
stream
.
walmethod
=
CreateWalTarMethod
(
param
->
xlog
,
compresslevel
,
do_sync
);
...
...
src/bin/pg_basebackup/pg_receivexlog.c
View file @
cada1af3
...
...
@@ -34,6 +34,7 @@
/* Global options */
static
char
*
basedir
=
NULL
;
static
int
verbose
=
0
;
static
int
compresslevel
=
0
;
static
int
noloop
=
0
;
static
int
standby_message_timeout
=
10
*
1000
;
/* 10 sec = default */
static
volatile
bool
time_to_abort
=
false
;
...
...
@@ -58,6 +59,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
exit(code); \
}
/* Routines to evaluate segment file format */
#define IsCompressXLogFileName(fname) \
(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
#define IsPartialCompressXLogFileName(fname) \
(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
static
void
usage
(
void
)
...
...
@@ -76,6 +86,7 @@ usage(void)
printf
(
_
(
" --synchronous flush transaction log immediately after writing
\n
"
));
printf
(
_
(
" -v, --verbose output verbose messages
\n
"
));
printf
(
_
(
" -V, --version output version information, then exit
\n
"
));
printf
(
_
(
" -Z, --compress=0-9 compress logs with given compression level
\n
"
));
printf
(
_
(
" -?, --help show this help, then exit
\n
"
));
printf
(
_
(
"
\n
Connection options:
\n
"
));
printf
(
_
(
" -d, --dbname=CONNSTR connection string
\n
"
));
...
...
@@ -188,14 +199,31 @@ FindStreamingStart(uint32 *tli)
uint32
tli
;
XLogSegNo
segno
;
bool
ispartial
;
bool
iscompress
;
/*
* Check if the filename looks like an xlog file, or a .partial file.
*/
if
(
IsXLogFileName
(
dirent
->
d_name
))
{
ispartial
=
false
;
iscompress
=
false
;
}
else
if
(
IsPartialXLogFileName
(
dirent
->
d_name
))
{
ispartial
=
true
;
iscompress
=
false
;
}
else
if
(
IsCompressXLogFileName
(
dirent
->
d_name
))
{
ispartial
=
false
;
iscompress
=
true
;
}
else
if
(
IsPartialCompressXLogFileName
(
dirent
->
d_name
))
{
ispartial
=
true
;
iscompress
=
true
;
}
else
continue
;
...
...
@@ -206,9 +234,15 @@ FindStreamingStart(uint32 *tli)
/*
* Check that the segment has the right size, if it's supposed to be
* completed.
* completed. For non-compressed segments just check the on-disk size
* and see if it matches a completed segment.
* For compressed segments, look at the last 4 bytes of the compressed
* file, which is where the uncompressed size is located for gz files
* with a size lower than 4GB, and then compare it to the size of a
* completed segment. The 4 last bytes correspond to the ISIZE member
* according to http://www.zlib.org/rfc-gzip.html.
*/
if
(
!
ispartial
)
if
(
!
ispartial
&&
!
iscompress
)
{
struct
stat
statbuf
;
char
fullpath
[
MAXPGPATH
];
...
...
@@ -229,6 +263,47 @@ FindStreamingStart(uint32 *tli)
continue
;
}
}
else
if
(
!
ispartial
&&
iscompress
)
{
int
fd
;
char
buf
[
4
];
int
bytes_out
;
char
fullpath
[
MAXPGPATH
];
snprintf
(
fullpath
,
sizeof
(
fullpath
),
"%s/%s"
,
basedir
,
dirent
->
d_name
);
fd
=
open
(
fullpath
,
O_RDONLY
|
PG_BINARY
);
if
(
fd
<
0
)
{
fprintf
(
stderr
,
_
(
"%s: could not open compressed file
\"
%s
\"
: %s
\n
"
),
progname
,
fullpath
,
strerror
(
errno
));
disconnect_and_exit
(
1
);
}
if
(
lseek
(
fd
,
(
off_t
)(
-
4
),
SEEK_END
)
<
0
)
{
fprintf
(
stderr
,
_
(
"%s: could not seek compressed file
\"
%s
\"
: %s
\n
"
),
progname
,
fullpath
,
strerror
(
errno
));
disconnect_and_exit
(
1
);
}
if
(
read
(
fd
,
(
char
*
)
buf
,
sizeof
(
buf
))
!=
sizeof
(
buf
))
{
fprintf
(
stderr
,
_
(
"%s: could not read compressed file
\"
%s
\"
: %s
\n
"
),
progname
,
fullpath
,
strerror
(
errno
));
disconnect_and_exit
(
1
);
}
close
(
fd
);
bytes_out
=
(
buf
[
3
]
<<
24
)
|
(
buf
[
2
]
<<
16
)
|
(
buf
[
1
]
<<
8
)
|
buf
[
0
];
if
(
bytes_out
!=
XLOG_SEG_SIZE
)
{
fprintf
(
stderr
,
_
(
"%s: compressed segment file
\"
%s
\"
has incorrect uncompressed size %d, skipping
\n
"
),
progname
,
dirent
->
d_name
,
bytes_out
);
continue
;
}
}
/* Looks like a valid segment. Remember that we saw it. */
if
((
segno
>
high_segno
)
||
...
...
@@ -339,7 +414,8 @@ StreamLog(void)
stream
.
synchronous
=
synchronous
;
stream
.
do_sync
=
true
;
stream
.
mark_done
=
false
;
stream
.
walmethod
=
CreateWalDirectoryMethod
(
basedir
,
stream
.
do_sync
);
stream
.
walmethod
=
CreateWalDirectoryMethod
(
basedir
,
compresslevel
,
stream
.
do_sync
);
stream
.
partial_suffix
=
".partial"
;
stream
.
replication_slot
=
replication_slot
;
stream
.
temp_slot
=
false
;
...
...
@@ -392,6 +468,7 @@ main(int argc, char **argv)
{
"status-interval"
,
required_argument
,
NULL
,
's'
},
{
"slot"
,
required_argument
,
NULL
,
'S'
},
{
"verbose"
,
no_argument
,
NULL
,
'v'
},
{
"compress"
,
required_argument
,
NULL
,
'Z'
},
/* action */
{
"create-slot"
,
no_argument
,
NULL
,
1
},
{
"drop-slot"
,
no_argument
,
NULL
,
2
},
...
...
@@ -422,7 +499,7 @@ main(int argc, char **argv)
}
}
while
((
c
=
getopt_long
(
argc
,
argv
,
"D:d:h:p:U:s:S:nwWv"
,
while
((
c
=
getopt_long
(
argc
,
argv
,
"D:d:h:p:U:s:S:nwWv
Z:
"
,
long_options
,
&
option_index
))
!=
-
1
)
{
switch
(
c
)
...
...
@@ -472,6 +549,15 @@ main(int argc, char **argv)
case
'v'
:
verbose
++
;
break
;
case
'Z'
:
compresslevel
=
atoi
(
optarg
);
if
(
compresslevel
<
0
||
compresslevel
>
9
)
{
fprintf
(
stderr
,
_
(
"%s: invalid compression level
\"
%s
\"\n
"
),
progname
,
optarg
);
exit
(
1
);
}
break
;
/* action */
case
1
:
do_create_slot
=
true
;
...
...
@@ -538,6 +624,16 @@ main(int argc, char **argv)
exit
(
1
);
}
#ifndef HAVE_LIBZ
if
(
compresslevel
!=
0
)
{
fprintf
(
stderr
,
_
(
"%s: this build does not support compression
\n
"
),
progname
);
exit
(
1
);
}
#endif
/*
* Check existence of destination folder.
*/
...
...
src/bin/pg_basebackup/walmethods.c
View file @
cada1af3
...
...
@@ -41,6 +41,7 @@
typedef
struct
DirectoryMethodData
{
char
*
basedir
;
int
compression
;
bool
sync
;
}
DirectoryMethodData
;
static
DirectoryMethodData
*
dir_data
=
NULL
;
...
...
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
char
*
pathname
;
char
*
fullpath
;
char
*
temp_suffix
;
#ifdef HAVE_LIBZ
gzFile
gzfp
;
#endif
}
DirectoryMethodFile
;
static
char
*
...
...
@@ -70,17 +74,47 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
static
char
tmppath
[
MAXPGPATH
];
int
fd
;
DirectoryMethodFile
*
f
;
#ifdef HAVE_LIBZ
gzFile
gzfp
=
NULL
;
#endif
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s"
,
dir_data
->
basedir
,
pathname
,
temp_suffix
?
temp_suffix
:
""
);
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s%s"
,
dir_data
->
basedir
,
pathname
,
dir_data
->
compression
>
0
?
".gz"
:
""
,
temp_suffix
?
temp_suffix
:
""
);
/*
* Open a file for non-compressed as well as compressed files. Tracking
* the file descriptor is important for dir_sync() method as gzflush()
* does not do any system calls to fsync() to make changes permanent on
* disk.
*/
fd
=
open
(
tmppath
,
O_WRONLY
|
O_CREAT
|
PG_BINARY
,
S_IRUSR
|
S_IWUSR
);
if
(
fd
<
0
)
return
NULL
;
if
(
pad_to_size
)
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
{
gzfp
=
gzdopen
(
fd
,
"wb"
);
if
(
gzfp
==
NULL
)
{
close
(
fd
);
return
NULL
;
}
if
(
gzsetparams
(
gzfp
,
dir_data
->
compression
,
Z_DEFAULT_STRATEGY
)
!=
Z_OK
)
{
gzclose
(
gzfp
);
return
NULL
;
}
}
#endif
/* Do pre-padding on non-compressed files */
if
(
pad_to_size
&&
dir_data
->
compression
==
0
)
{
/* Always pre-pad on regular files */
char
*
zerobuf
;
int
bytes
;
...
...
@@ -120,12 +154,21 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if
(
fsync_fname
(
tmppath
,
false
,
progname
)
!=
0
||
fsync_parent_path
(
tmppath
,
progname
)
!=
0
)
{
close
(
fd
);
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
gzclose
(
gzfp
);
else
#endif
close
(
fd
);
return
NULL
;
}
}
f
=
pg_malloc0
(
sizeof
(
DirectoryMethodFile
));
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
f
->
gzfp
=
gzfp
;
#endif
f
->
fd
=
fd
;
f
->
currpos
=
0
;
f
->
pathname
=
pg_strdup
(
pathname
);
...
...
@@ -144,7 +187,12 @@ dir_write(Walfile f, const void *buf, size_t count)
Assert
(
f
!=
NULL
);
r
=
write
(
df
->
fd
,
buf
,
count
);
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
r
=
(
ssize_t
)
gzwrite
(
df
->
gzfp
,
buf
,
count
);
else
#endif
r
=
write
(
df
->
fd
,
buf
,
count
);
if
(
r
>
0
)
df
->
currpos
+=
r
;
return
r
;
...
...
@@ -169,7 +217,12 @@ dir_close(Walfile f, WalCloseMethod method)
Assert
(
f
!=
NULL
);
r
=
close
(
df
->
fd
);
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
r
=
gzclose
(
df
->
gzfp
);
else
#endif
r
=
close
(
df
->
fd
);
if
(
r
==
0
)
{
...
...
@@ -180,17 +233,22 @@ dir_close(Walfile f, WalCloseMethod method)
* If we have a temp prefix, normal operation is to rename the
* file.
*/
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s"
,
dir_data
->
basedir
,
df
->
pathname
,
df
->
temp_suffix
);
snprintf
(
tmppath2
,
sizeof
(
tmppath2
),
"%s/%s"
,
dir_data
->
basedir
,
df
->
pathname
);
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s%s"
,
dir_data
->
basedir
,
df
->
pathname
,
dir_data
->
compression
>
0
?
".gz"
:
""
,
df
->
temp_suffix
);
snprintf
(
tmppath2
,
sizeof
(
tmppath2
),
"%s/%s%s"
,
dir_data
->
basedir
,
df
->
pathname
,
dir_data
->
compression
>
0
?
".gz"
:
""
);
r
=
durable_rename
(
tmppath
,
tmppath2
,
progname
);
}
else
if
(
method
==
CLOSE_UNLINK
)
{
/* Unlink the file once it's closed */
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s"
,
dir_data
->
basedir
,
df
->
pathname
,
df
->
temp_suffix
?
df
->
temp_suffix
:
""
);
snprintf
(
tmppath
,
sizeof
(
tmppath
),
"%s/%s%s%s"
,
dir_data
->
basedir
,
df
->
pathname
,
dir_data
->
compression
>
0
?
".gz"
:
""
,
df
->
temp_suffix
?
df
->
temp_suffix
:
""
);
r
=
unlink
(
tmppath
);
}
else
...
...
@@ -226,6 +284,14 @@ dir_sync(Walfile f)
if
(
!
dir_data
->
sync
)
return
0
;
#ifdef HAVE_LIBZ
if
(
dir_data
->
compression
>
0
)
{
if
(
gzflush
(((
DirectoryMethodFile
*
)
f
)
->
gzfp
,
Z_SYNC_FLUSH
)
!=
Z_OK
)
return
-
1
;
}
#endif
return
fsync
(((
DirectoryMethodFile
*
)
f
)
->
fd
);
}
...
...
@@ -277,7 +343,7 @@ dir_finish(void)
WalWriteMethod
*
CreateWalDirectoryMethod
(
const
char
*
basedir
,
bool
sync
)
CreateWalDirectoryMethod
(
const
char
*
basedir
,
int
compression
,
bool
sync
)
{
WalWriteMethod
*
method
;
...
...
@@ -293,6 +359,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
method
->
getlasterror
=
dir_getlasterror
;
dir_data
=
pg_malloc0
(
sizeof
(
DirectoryMethodData
));
dir_data
->
compression
=
compression
;
dir_data
->
basedir
=
pg_strdup
(
basedir
);
dir_data
->
sync
=
sync
;
...
...
src/bin/pg_basebackup/walmethods.h
View file @
cada1af3
...
...
@@ -41,7 +41,8 @@ struct WalWriteMethod
* (only implements the methods required for pg_basebackup,
* not all those required for pg_receivexlog)
*/
WalWriteMethod
*
CreateWalDirectoryMethod
(
const
char
*
basedir
,
bool
sync
);
WalWriteMethod
*
CreateWalDirectoryMethod
(
const
char
*
basedir
,
int
compression
,
bool
sync
);
WalWriteMethod
*
CreateWalTarMethod
(
const
char
*
tarbase
,
int
compression
,
bool
sync
);
/* Cleanup routines for previously-created methods */
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment