aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md3
-rw-r--r--pgbak.c288
-rw-r--r--test.py47
3 files changed, 211 insertions, 127 deletions
diff --git a/README.md b/README.md
index 84ae7a4..c22cedd 100644
--- a/README.md
+++ b/README.md
@@ -8,8 +8,6 @@ Data is stored in a local directory provided through the `PGBAK` environment var
Some tasks are performed by a "sync" process that is automatically started in the background when necessary. This includes taking full snapshots and running the backup script.
-Note that `pgbak` is not a full point-in-time recovery archiver. In some cases, for example during snapshots, some WAL files may be missed.
-
## Backup directory structure
- `$PGBAK/TIMESTAMP/` — One or more directories containing base backups and WAL segments. The `TIMESTAMP` is when the base backup was taken.
@@ -38,6 +36,7 @@ When the `backup` script is started, the current directory is set to the subdire
1. the timestamp of the base backup
2. the current timestamp
+3. an optional `old` flag if this is the last invocation of the backup script before switching to a newer snapshot
Existing files will never disappear or change, provided the base backup timestamp is the same. However, new compressed WAL files may appear at any time. If this happens while the `backup` script is running, it will be called again with a refreshed current timestamp.
diff --git a/pgbak.c b/pgbak.c
index 8fa265f..876fff9 100644
--- a/pgbak.c
+++ b/pgbak.c
@@ -48,13 +48,25 @@ static const char *const LOCK_KIND[] = {
// ===== State flags =====
-#define STATE_SHIFT 4 // low=pending, high=running
-#define STATE_MASK 0x0f
+#define STATE_MASK 0x7
// We need to run the backup script.
-#define STATE_ARCHIVE 0x01
-// A new full snapshot needs to be made.
-#define STATE_SNAPSHOT 0x02
+#define S_WANT_ARCHIVE 0x08
+// A "next" full snapshot should be made unless one is in progress.
+#define S_WANT_SNAPSHOT 0x10
+// A "next" full snapshot needs to be made.
+#define S_WANT_NEW_SNAPSHOT 0x20
+
+// Done.
+#define STATE_IDLE 0x0
+// Run the backup script, goto STATE_IDLE
+#define STATE_ARCHIVE 0x1
+// Rename next->current, goto STATE_ARCHIVE
+#define STATE_PROMOTE 0x2
+// Run the backup script, goto STATE_PROMOTE
+#define STATE_PREARCHIVE 0x3
+// Take a new snapshot, goto STATE_PREARCHIVE
+#define STATE_SNAPSHOT 0x4
static void lck_release(int pos) {
@@ -196,15 +208,6 @@ static int cld_pipe(int in_fd, int out_fd, const char *const *argv) {
static int bak_snapshot(char *name) {
- {
- struct timespec ts;
- if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
- fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
- abort();
- }
- sprintf(name, "%lld", (long long)ts.tv_sec);
- }
-
if(mkdirat(backup_dfd, name, 0700) < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name);
return -1;
@@ -228,9 +231,9 @@ static int bak_snapshot(char *name) {
goto fail_clean;
}
- (void)unlinkat(backup_dfd, "current", 0);
- if(symlinkat(name, backup_dfd, "current") < 0) {
- fprintf(stderr, "Failed to update $PGBAK/current: %m\n");
+ (void)unlinkat(backup_dfd, "next", 0);
+ if(symlinkat(name, backup_dfd, "next") < 0) {
+ fprintf(stderr, "Failed to update $PGBAK/next: %m\n");
goto fail;
}
@@ -266,9 +269,12 @@ static int bak_snapshot(char *name) {
fprintf(stderr, "fsync(): %m\n");
abort();
}
- return subdfd;
+ close(subdfd);
+
+ return 0;
fail:
+ (void)unlinkat(backup_dfd, "next", 0);
close(out_fd);
fail_clean:
(void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR);
@@ -283,11 +289,11 @@ static bool should_pgbasebackup(int subdfd) {
struct stat st;
if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) {
- fprintf(stderr, "Failed to stat %s: %m\n", BASE_OUT);
+ fprintf(stderr, "Failed to stat %s: %m; creating new snapshot\n", BASE_OUT);
return true;
}
if(!S_ISREG(st.st_mode)) {
- fprintf(stderr, "Expected regular file at %s\n", BASE_OUT);
+ fprintf(stderr, "Expected regular file at %s; creating new snapshot\n", BASE_OUT);
return true;
}
@@ -297,7 +303,7 @@ static bool should_pgbasebackup(int subdfd) {
int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(wal_dfd < 0) {
- fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
+ fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m; creating new snapshot\n");
return true;
}
@@ -323,90 +329,135 @@ static bool should_pgbasebackup(int subdfd) {
}
closedir(wal_dir);
- return wal_count > 8 && (wal_size >> 1) >= base_size;
+ if(wal_count > 8 && (wal_size >> 1) >= base_size) {
+ fprintf(stderr, "WAL directroy too big; creating new snapshot\n");
+ return true;
+ }
+ return false;
+}
+
+static int bak_open_current(char *name, size_t namebuf) {
+ ssize_t r = readlinkat(backup_dfd, "current", name, namebuf);
+ if(r < 0) {
+ if(errno != ENOENT) {
+ fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
+ return -1;
+ }
+ fprintf(stderr, "Failed to open() $PGBAK/current: %m\n");
+ return -1;
+ }
+ if((size_t)r >= namebuf) {
+ fprintf(stderr, "Symlink $PGBAK/current target too long\n");
+ return -1;
+ }
+ name[r] = 0;
+ int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(subdfd < 0) {
+ fprintf(stderr, "Failed to open() $PGBAK/%s: %m\n", name);
+ }
+ return subdfd;
}
// Archiver. Called in a subprocess with LOCK_STATE.
static void bak_work(int state) {
unsigned int backoff = 4 * 512;
- char name[32];
- char ts_s[32];
- const char *const cmd[] = {
- "../scripts/backup", name, ts_s, NULL,
+ char cur_name[32];
+ char ts_name[32];
+ const char *archive_cmd[] = {
+ "../scripts/backup", cur_name, ts_name, NULL, NULL,
};
lck_wait(LOCK_WAIT);
bool did_snapshot = false;
- int subdfd = -1;
bool refresh_ts = true;
+ int flags = state;
+ state &= STATE_MASK;
// Skip reopening if a full snapshot was requested.
while(true) {
- if((state & STATE_ARCHIVE) && subdfd < 0) {
- ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name));
- if(r < 0) {
- if(errno != ENOENT) {
- fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
- _Exit(1);
- }
+ switch(state) {
+ case STATE_IDLE:
+ if(flags & S_WANT_ARCHIVE) state = STATE_ARCHIVE;
+ [[fallthrough]];
+ case STATE_ARCHIVE:
+ if(flags & S_WANT_SNAPSHOT) state = STATE_SNAPSHOT;
+ [[fallthrough]];
+ case STATE_PROMOTE:
+ case STATE_PREARCHIVE:
+ if(flags & S_WANT_NEW_SNAPSHOT) state = STATE_SNAPSHOT;
+ break;
+ default:
state = STATE_SNAPSHOT;
- } else {
- if((size_t)r >= sizeof(name)) {
- fprintf(stderr, "Symlink $PGBAK/current target too long\n");
- _Exit(1);
- }
- name[r] = 0;
- subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
- if(subdfd < 0) {
- fprintf(stderr, "Failed to open() $PGBAK/%s: %m\n", name);
- state = STATE_SNAPSHOT;
- }
- }
}
+ lck_write(state);
+ lck_release(LOCK_STATE);
+
+ if(state == STATE_IDLE) break;
- if((state & STATE_ARCHIVE) && refresh_ts) {
+ if(state == STATE_SNAPSHOT || refresh_ts) {
refresh_ts = false;
struct timespec ts;
if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
abort();
}
- sprintf(ts_s, "%lld", (long long)ts.tv_sec);
+ sprintf(ts_name, "%lld", (long long)ts.tv_sec);
}
- lck_write(state << STATE_SHIFT);
- lck_release(LOCK_STATE);
-
const char *failed = NULL;
- if(state & STATE_ARCHIVE) {
- pid_t child = cld_spawn(-1, -1, subdfd, cmd);
- if(cld_wait(child, cmd[0])) {
- failed = "backup script";
- } else {
- fprintf(stderr, "Finished archive %s-%s\n", name, ts_s);
- }
- if(!(state & STATE_SNAPSHOT) && !did_snapshot && should_pgbasebackup(subdfd)) {
- state |= STATE_SNAPSHOT;
- }
- if(!failed) {
- state &= ~STATE_ARCHIVE;
- }
- } else if(state & STATE_SNAPSHOT) {
- if(subdfd >= 0) {
- close(subdfd);
- }
- subdfd = bak_snapshot(name);
- if(subdfd < 0) {
- failed = "pg_basebackup";
- } else {
- fprintf(stderr, "Finished snapshot %s\n", name);
+ switch(state) {
+ case STATE_SNAPSHOT:
+ if(bak_snapshot(ts_name) < 0) {
+ failed = "pg_basebackup";
+ break;
+ }
+ fprintf(stderr, "Finished snapshot %s\n", ts_name);
did_snapshot = true;
- refresh_ts = true;
+ state = STATE_PREARCHIVE;
+ break;
+ case STATE_PROMOTE:
+ if(renameat(backup_dfd, "next", backup_dfd, "current") < 0) {
+ fprintf(stderr, "Failed to promote $PGBAK/next to current: %m\n");
+ state = STATE_SNAPSHOT;
+ break;
+ }
+ if(fsync(backup_dfd) < 0) {
+ fprintf(stderr, "fsync(): %m\n");
+ abort();
+ }
state = STATE_ARCHIVE;
+ break;
+ case STATE_ARCHIVE:
+ case STATE_PREARCHIVE:
+ {
+ int subdfd = bak_open_current(cur_name, sizeof(cur_name));
+ if(subdfd < 0) {
+ if(state == STATE_ARCHIVE) {
+ state = STATE_SNAPSHOT;
+ } else {
+ state = STATE_PROMOTE;
+ }
+ break;
+ }
+ archive_cmd[3] = (state == STATE_PREARCHIVE) ? "old" : NULL;
+ pid_t child = cld_spawn(-1, -1, subdfd, archive_cmd);
+ if(cld_wait(child, archive_cmd[0])) {
+ close(subdfd);
+ failed = "backup script";
+ break;
+ }
+ fprintf(stderr, "Finished archive %s-%s\n", cur_name, ts_name);
+ if(state == STATE_PREARCHIVE) {
+ state = STATE_PROMOTE;
+ } else if(!did_snapshot && should_pgbasebackup(subdfd)) {
+ state = STATE_SNAPSHOT;
+ } else {
+ state = STATE_IDLE;
+ }
+ close(subdfd);
+ break;
}
- } else {
- break;
}
if(failed) {
@@ -427,9 +478,7 @@ static void bak_work(int state) {
}
lck_wait(LOCK_STATE);
- int nstate = lck_read();
- state |= nstate & STATE_MASK;
- if(nstate & STATE_ARCHIVE) refresh_ts = true;
+ flags = lck_read();
}
}
@@ -439,16 +488,27 @@ static int bak_begin(int flags) {
lck_wait(LOCK_STATE);
int state = lck_read();
+ state |= flags;
if(!lck_try(LOCK_BACKUP)) {
if(flags & ~state) {
- lck_write(state | flags);
+ lck_write(state);
}
+ lck_release(LOCK_STATE);
return -1;
}
// We got the backup lock. If anything was running, it must have crashed.
// Mark it as pending.
- return flags | (state & STATE_MASK) | (state >> STATE_SHIFT);
+ switch(state & STATE_MASK) {
+ case STATE_SNAPSHOT:
+ fprintf(stderr, "Resuming interrupted snapshot\n");
+ break;
+ case STATE_PROMOTE:
+ case STATE_ARCHIVE:
+ fprintf(stderr, "Resuming interrupted backup\n");
+ break;
+ }
+ return state;
}
static void on_sigalrm(int) {
@@ -475,32 +535,47 @@ static int cmd_wal(const char *name) {
return 1;
}
- int flags = STATE_ARCHIVE;
- int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
- if(wal_dfd < 0) {
- if(errno == ENOENT) {
- fprintf(stderr, "Directory $PGBAK/current/pg_wal does not exist; skipping %s and doing snapshot\n", name);
- goto skip;
+ int flags = S_WANT_ARCHIVE;
+ // current is renamed to next, so by opening next first, we ensure that they
+ // are distinct.
+ int next_dfd = openat(backup_dfd, "next/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(next_dfd < 0 && errno != ENOENT) {
+ fprintf(stderr, "Failed to open $PGBAK/next/pg_wal: %m; will force new snapshot\n");
+ flags |= S_WANT_NEW_SNAPSHOT;
+ }
+ int cur_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(cur_dfd < 0) {
+ if(errno != ENOENT) {
+ fprintf(stderr, "Failed to open directory $PGBAK/current/pg_wal: %m\n");
+ return 1;
+ }
+ fprintf(stderr, "Directory $PGBAK/current/pg_wal does not exist; will request new snapshot\n");
+ if(next_dfd < 0) {
+ flags = S_WANT_SNAPSHOT;
}
- fprintf(stderr, "Failed to open directory $PGBAK/current/pg_wal: %m\n");
- return 1;
+ }
+ if(cur_dfd < 0 && next_dfd < 0) {
+ fprintf(stderr, "Skipping %s\n", name);
+ goto skip;
}
char name_out[256];
memcpy(name_out, name + 7, name_len - 7);
memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1);
- if(faccessat(wal_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) {
- // Assume we've gone back in time
+ if(
+ (cur_dfd >= 0 && faccessat(cur_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) ||
+ (next_dfd >= 0 && faccessat(next_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0)
+ ) {
+ // Assume we've gone back in WAL history
fprintf(stderr, "Backup of %s already exists; forcing snapshot\n", name);
- close(wal_dfd);
- // Prevent misdirected WALs or double snapshot
- (void) unlinkat(backup_dfd, "current", 0);
- flags = STATE_SNAPSHOT;
+ if(cur_dfd >= 0) close(cur_dfd);
+ if(next_dfd >= 0) close(next_dfd);
+ flags = S_WANT_NEW_SNAPSHOT;
goto skip;
}
- int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
+ int out_fd = openat(backup_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
if(out_fd < 0) {
fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n");
goto fail_dir;
@@ -521,17 +596,25 @@ static int cmd_wal(const char *name) {
char fd_path[32];
sprintf(fd_path, "/proc/self/fd/%d", out_fd);
- if(linkat(AT_FDCWD, fd_path, wal_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
+ if(cur_dfd >= 0 && linkat(AT_FDCWD, fd_path, cur_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
+ fprintf(stderr, "Failed to link WAL backup: %m\n");
+ goto fail;
+ }
+ if(next_dfd >= 0 && linkat(AT_FDCWD, fd_path, next_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link WAL backup: %m\n");
goto fail;
}
close(out_fd);
- if(fsync(wal_dfd) < 0) {
+ if(cur_dfd >= 0 && fsync(cur_dfd) < 0) {
+ fprintf(stderr, "fsync(): %m\n");
+ goto fail_dir;
+ }
+ if(next_dfd >= 0 && fsync(next_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
goto fail_dir;
}
- close(wal_dfd);
+ close(cur_dfd);
skip:
flags = bak_begin(flags);
@@ -551,14 +634,15 @@ skip:
fail:
close(out_fd);
fail_dir:
- close(wal_dfd);
+ if(cur_dfd >= 0) close(cur_dfd);
+ if(next_dfd >= 0) close(next_dfd);
return 1;
}
static int cmd_sync(bool force_archive, bool force_snapshot) {
int flags = 0;
- if(force_archive) flags |= STATE_ARCHIVE;
- if(force_snapshot) flags |= STATE_SNAPSHOT;
+ if(force_archive) flags |= S_WANT_ARCHIVE;
+ if(force_snapshot) flags |= S_WANT_NEW_SNAPSHOT;
flags = bak_begin(flags);
if(flags < 0) {
return cmd_wait(-1);
diff --git a/test.py b/test.py
index 163c589..ec0c5b8 100644
--- a/test.py
+++ b/test.py
@@ -25,8 +25,7 @@ class Test(unittest.TestCase):
self._wal_req = threading.Condition(self._wal_lock)
self._wal_done = threading.Condition(self._wal_lock)
- self._bak_relax = False
- self._bak_list = []
+ self._bak_list = [(0, 0)]
self._prog = os.path.realpath('./pgbak')
self._sock,self._sock_prog = socket.socketpair(socket.AF_UNIX)
@@ -42,12 +41,12 @@ class Test(unittest.TestCase):
os.mkdir(os.path.join(self._db_path, 'pg_wal'))
with open(os.path.join(self._bin_path, 'pg_basebackup'), 'w') as f:
- f.write('#!/bin/sh\necho -n r >&0; read t; cat base; sleep "$t"; [ "$((RANDOM%200))" != 0 ]\n')
+ f.write('#!/bin/sh\necho -n r >&0; read t; [ "$t" != k ] || exec kill -9 "$PPID"; cat base; sleep "$t"; [ "$((RANDOM%200))" != 0 ]\n')
os.fchmod(f.fileno(), 0o755)
with open(os.path.join(self._bak_path, 'scripts', 'backup'), 'w') as f:
shp = shlex.quote(self._tmpdir)
- f.write(f'#!/bin/sh\nset -e; sleep 0.1; [ "$((RANDOM%200))" == 0 ] && exit 1; tar -c . > {shp}/bak-dir.tar; echo -n w >&0; read t\n')
+ f.write(f'#!/bin/sh\nset -e; sleep 0.1; [ "$((RANDOM%200))" == 0 ] && exit 1; tar -c . > {shp}/bak-dir.tar; echo -n w >&0; read t; [ "$t" != k ] || kill -9 "$PPID"\n')
os.fchmod(f.fileno(), 0o755)
self._wal_thr = threading.Thread(target=self._run_wal)
@@ -84,7 +83,11 @@ class Test(unittest.TestCase):
if op == b'w':
self._bak_read()
- self._sock.send(b'.\n')
+ if random.randrange(10) == 0:
+ print('>>> killing backup')
+ self._sock.send(b'k\n')
+ else:
+ self._sock.send(b'.\n')
continue
if op == b'r':
@@ -92,9 +95,13 @@ class Test(unittest.TestCase):
i = self._wal_make()
with open(os.path.join(self._db_path, 'base'), 'wb') as f:
f.write(f'up to {i}\n'.encode() + os.urandom(20480))
- t = random.random() * 0.2
- self._wal_make()
- self._sock.send(f'{t}\n'.encode())
+ if random.randrange(10) == 0:
+ print('>>> killing snapshot')
+ self._sock.send(b'k\n')
+ else:
+ t = random.random() * 0.2
+ self._wal_make()
+ self._sock.send(f'{t}\n'.encode())
continue
raise RuntimeError(f'bad command {op!r}')
@@ -154,10 +161,11 @@ class Test(unittest.TestCase):
while end in wals:
end += 1
end -= 1
- if end < begin + 1 and not self._bak_relax:
+ if end < begin + 1:
raise RuntimeError('missing first WAL')
- self._bak_list.append(end)
+ assert len(self._bak_list) == 1 or begin <= self._bak_list[-1][1]
+ self._bak_list.append((begin, end))
def _run(self, *args):
env = {**os.environ}
@@ -170,19 +178,12 @@ class Test(unittest.TestCase):
i = self._wal_make()
if random.randrange(20) == 0:
self._run('wait')
- if not self._bak_relax:
- self.assertGreaterEqual(self._bak_list[-1], i)
- self._bak_relax = False
-
- if random.randrange(30) == 0:
- # Relax the backup consistency requirement until the end of the next sync
- self._bak_relax = True
- try:
- os.unlink(os.path.join(self._bak_path, 'current'))
- except FileNotFoundError:
- pass
- else:
- print('>>> forcing full backup')
+ self.assertGreaterEqual(self._bak_list[-1][1], i)
+
+ elif random.randrange(30) == 0:
+ print('>>> forcing full backup')
+ self._run('full-sync')
+ self.assertGreaterEqual(self._bak_list[-1][1], i)
if __name__ == '__main__':
unittest.main()