aboutsummaryrefslogtreecommitdiff
path: root/pgbak.c
diff options
context:
space:
mode:
Diffstat (limited to 'pgbak.c')
-rw-r--r--pgbak.c569
1 files changed, 569 insertions, 0 deletions
diff --git a/pgbak.c b/pgbak.c
new file mode 100644
index 0000000..439d643
--- /dev/null
+++ b/pgbak.c
@@ -0,0 +1,569 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+//
+// Copyright 2021 Hristo Venev
+
+#define _GNU_SOURCE
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/file.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "config.h"
+
+static int backup_dfd;
+static int lck_fd;
+
+
+static const char *const LOCK_KIND[] = {
+ // The `state` lock must be held while doing state-related updates, for
+ // example modifying the "archive needed" flag.
+ "state",
+
+ // The `backup` lock is held by the archiver process.
+ "backup",
+
+ // The `wait` lock is acquired by the archiver process after the `backup`
+ // lock. It is also acquired by processes that wait for the archiver to
+ // finish.
+ "wait",
+};
+
+static void lck_release(int pos) {
+ struct flock l = {
+ .l_type = F_UNLCK,
+ .l_whence = SEEK_SET,
+ .l_start = pos,
+ .l_len = 1,
+ };
+ while(true) {
+ if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
+ if(errno == EINTR) continue;
+ fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]);
+ abort();
+ }
+}
+
+static void lck_wait(int pos) {
+ struct flock l = {
+ .l_type = F_WRLCK,
+ .l_whence = SEEK_SET,
+ .l_start = pos,
+ .l_len = 1,
+ };
+ while(true) {
+ if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
+ if(errno == EINTR) continue;
+ fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
+ abort();
+ }
+}
+
+static bool lck_try(int pos) {
+ struct flock l = {
+ .l_type = F_WRLCK,
+ .l_whence = SEEK_SET,
+ .l_start = pos,
+ .l_len = 1,
+ };
+ while(true) {
+ if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true;
+ if(errno == EINTR) continue;
+ if(errno == EAGAIN || errno == EACCES) return false;
+ fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
+ abort();
+ }
+}
+
+static int lck_read(int pos) {
+ unsigned char b;
+ ssize_t r = pread(lck_fd, &b, 1, pos);
+ if(r < 0) {
+ fprintf(stderr, "Failed to read lock file: %m\n");
+ abort();
+ }
+ if(r >= 1) return b;
+ return -1;
+}
+
+static void lck_write(int pos, unsigned char b) {
+ ssize_t r = pwrite(lck_fd, &b, 1, pos);
+ if(r < 0) {
+ fprintf(stderr, "Failed to read lock file: %m\n");
+ abort();
+ }
+ if(r == 0) {
+ fprintf(stderr, "Failed to write lock file: Empty write\n");
+ abort();
+ }
+}
+
+static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) {
+ pid_t child = fork();
+ if(child < 0) {
+ fprintf(stderr, "fork(): %m\n");
+ abort();
+ }
+ if(child == 0) {
+ if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) _exit(1);
+ if(in_fd >= 0 && dup2(in_fd, 0) < 0) _exit(1);
+ if(out_fd >= 0 && dup2(out_fd, 1) < 0) _exit(1);
+ execvpe(argv[0], (char*const*)argv, environ);
+ _exit(1);
+ }
+ return child;
+}
+
+static int cld_wait(pid_t child, const char *cld) {
+ int status;
+ if(waitpid(child, &status, 0) < 0) {
+ fprintf(stderr, "waitpid(): %m\n");
+ abort();
+ }
+
+ if(WIFEXITED(status)) {
+ int r = WEXITSTATUS(status);
+ if(r) {
+ fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r);
+ }
+ return r;
+ } else if(WIFSIGNALED(status)) {
+ int r = WTERMSIG(status);
+ fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r);
+ return 128 + r;
+ } else {
+ fprintf(stderr, "waitpid(): Unknown status\n");
+ abort();
+ }
+}
+
+static int cld_pipe(int in_fd, int out_fd, const char *const *argv) {
+ pid_t child = cld_spawn(in_fd, out_fd, -1, argv);
+ if(in_fd >= 0) close(in_fd);
+
+ int r = cld_wait(child, argv[0]);
+ if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) {
+ fprintf(stderr, "fsync(): %m\n");
+ r = -1;
+ }
+ return r;
+}
+
+
+static int bak_snapshot(char *name) {
+ {
+ struct timespec ts;
+ if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
+ fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
+ abort();
+ }
+ sprintf(name, "%lld", (long long)ts.tv_sec);
+ }
+
+ if(mkdirat(backup_dfd, name, 0700) < 0) {
+ fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name);
+ return -1;
+ }
+
+ int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(subdfd < 0) {
+ fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name);
+ goto fail_rmdir;
+ }
+
+ if(mkdirat(subdfd, "pg_wal", 0700) < 0) {
+ fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name);
+ goto fail_close;
+ }
+
+ int out_fd = -1;
+ out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
+ if(out_fd < 0) {
+ fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT);
+ goto fail_clean;
+ }
+
+ (void)unlinkat(backup_dfd, "current", 0);
+ if(symlinkat(name, backup_dfd, "current") < 0) {
+ fprintf(stderr, "Failed to update $PGBAK/current: %m\n");
+ goto fail;
+ }
+
+ int pfd[2];
+ if(pipe(pfd) < 0) {
+ fprintf(stderr, "pipe(): %m\n");
+ goto fail;
+ }
+
+ fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT);
+
+ pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV);
+ close(pfd[1]);
+
+ if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) {
+ (void)kill(pid_backup, SIGKILL);
+ (void)waitpid(pid_backup, NULL, 0);
+ goto fail;
+ }
+ if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) {
+ goto fail;
+ }
+
+ char fd_path[32];
+ sprintf(fd_path, "/proc/self/fd/%d", out_fd);
+ if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) {
+ fprintf(stderr, "Failed to link base backup: %m\n");
+ goto fail;
+ }
+
+ close(out_fd);
+ if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) {
+ fprintf(stderr, "fsync(): %m\n");
+ abort();
+ }
+ return subdfd;
+
+fail:
+ close(out_fd);
+fail_clean:
+ (void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR);
+fail_close:
+ close(subdfd);
+fail_rmdir:
+ (void)unlinkat(backup_dfd, name, AT_REMOVEDIR);
+ return -1;
+}
+
+static bool should_pgbasebackup(int subdfd) {
+ struct stat st;
+
+ if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) {
+ fprintf(stderr, "Failed to stat %s: %m\n", BASE_OUT);
+ return true;
+ }
+ if(!S_ISREG(st.st_mode)) {
+ fprintf(stderr, "Expected regular file at %s\n", BASE_OUT);
+ return true;
+ }
+
+ off_t base_size = st.st_size;
+ off_t wal_size = 0;
+ off_t wal_count = 0;
+
+ int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(wal_dfd < 0) {
+ fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
+ return true;
+ }
+
+ DIR *wal_dir = fdopendir(wal_dfd);
+ if(!wal_dir) {
+ fprintf(stderr, "fdopendir(): %m\n");
+ abort();
+ }
+
+ while(true) {
+ errno = 0;
+ struct dirent *ent = readdir(wal_dir);
+ if(!ent) {
+ if(errno == 0) break;
+ fprintf(stderr, "readdir(): %m\n");
+ abort();
+ }
+ if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue;
+
+ wal_count += 1;
+ if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue;
+ wal_size += st.st_size;
+ }
+ closedir(wal_dir);
+
+ return wal_count > 8 && (wal_size >> 1) >= base_size;
+}
+
+// Should we become an archiver? If so, acquire `backup` and `wait` locks.
+static bool bak_begin(bool force) {
+ bool r = false;
+ lck_wait(0);
+
+ if(!lck_try(1)) {
+ if(force) lck_write(0, 2);
+ goto end;
+ }
+
+ if(!force && lck_read(0) <= 0) {
+ lck_release(1);
+ goto end;
+ }
+
+ lck_wait(2);
+ r = true;
+
+end:
+ lck_release(0);
+ return r;
+}
+
+// Archiver
+static void bak_work(void) {
+ unsigned int backoff = 4 * 512;
+ char name[32];
+ char ts_s[32];
+ const char *const cmd[] = {
+ "../scripts/backup", name, ts_s, NULL,
+ };
+
+ int subdfd = -1;
+
+ {
+ ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name));
+ if(r < 0) {
+ if(errno != ENOENT) {
+ fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
+ exit(1);
+ }
+ } else if((size_t)r >= sizeof(name)) {
+ fprintf(stderr, "Symlink $PGBAK/current target too long\n");
+ exit(1);
+ } else {
+ name[r] = 0;
+ subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ }
+ }
+
+ bool want_ts = true;
+ while(true) {
+ lck_write(0, 1);
+
+ bool want_snapshot = false;
+ const char *failed = NULL;
+ if(subdfd >= 0) {
+ if(want_ts) {
+ struct timespec ts;
+ if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
+ fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
+ abort();
+ }
+ sprintf(ts_s, "%lld", (long long)ts.tv_sec);
+ want_ts = false;
+ }
+
+ pid_t child = cld_spawn(-1, -1, subdfd, cmd);
+ if(cld_wait(child, cmd[0])) {
+ failed = "backup script";
+ } else {
+ fprintf(stderr, "Finished backup %s-%s\n", name, ts_s);
+ }
+
+ if(should_pgbasebackup(subdfd)) {
+ close(subdfd);
+ want_snapshot = true;
+ }
+ } else {
+ want_snapshot = true;
+ }
+
+ if(want_snapshot) {
+ subdfd = bak_snapshot(name);
+ if(subdfd < 0) {
+ failed = "pg_basebackup";
+ }
+ }
+
+ lck_wait(0);
+ int r = lck_read(0);
+ if(r < 0) {
+ fprintf(stderr, "Lock file truncated: %m\n");
+ abort();
+ }
+ if(r > 1) {
+ want_ts = true;
+ } else if(!failed) {
+ lck_write(0, 0);
+ break;
+ }
+ lck_release(0);
+
+ if(failed) {
+ backoff += backoff >> 2;
+ if(backoff > 153600) backoff = 153600;
+ unsigned rnd;
+ (void)getentropy(&rnd, sizeof(rnd));
+ backoff += rnd & 1023;
+
+ struct timespec tv = {
+ .tv_sec = backoff / 512,
+ .tv_nsec = backoff % 512 * 1953125,
+ };
+ fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec);
+ (void)nanosleep(&tv, NULL);
+ } else {
+ backoff = 4 * 512;
+ }
+ }
+}
+
+static void on_sigalrm(int) {
+ _Exit(1);
+}
+
+static int cmd_wait(long timeout) {
+ if(timeout == 0) {
+ if(lck_try(2)) return 0;
+ return 1;
+ }
+ if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) {
+ signal(SIGALRM, on_sigalrm);
+ alarm((unsigned)timeout);
+ }
+ lck_wait(2);
+ return 0;
+}
+
+static int cmd_wal(const char *name) {
+ size_t name_len = strlen(name);
+ if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) {
+ fprintf(stderr, "Invalid WAL file name '%s'\n", name);
+ return 1;
+ }
+
+ int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(wal_dfd < 0) {
+ if(errno == ENOENT) goto do_full;
+ fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
+ return 1;
+ }
+
+ char name_out[256];
+ memcpy(name_out, name + 7, name_len - 7);
+ memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1);
+
+ if(faccessat(wal_dfd, name_out, R_OK, AT_SYMLINK_NOFOLLOW) >= 0) {
+ fprintf(stderr, "WAL backup exists at $PGBAK/current/%s, skipping\n", name_out);
+ return 0;
+ }
+
+ int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
+ if(out_fd < 0) {
+ fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n");
+ goto fail_dir;
+ }
+
+ int in_fd = open(name, O_RDONLY | O_CLOEXEC);
+ if(in_fd < 0) {
+ fprintf(stderr, "Failed to open WAL file %s: %m\n", name);
+ goto fail;
+ }
+
+ fprintf(stderr, "Archiving %s\n", name);
+
+ if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) {
+ fprintf(stderr, "Failed to create WAL backup\n");
+ goto fail;
+ }
+
+ char fd_path[32];
+ sprintf(fd_path, "/proc/self/fd/%d", out_fd);
+ if(linkat(AT_FDCWD, fd_path, wal_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
+ fprintf(stderr, "Failed to link WAL backup: %m\n");
+ goto fail;
+ }
+ close(out_fd);
+
+ if(fsync(wal_dfd) < 0) {
+ fprintf(stderr, "fsync(): %m\n");
+ goto fail_dir;
+ }
+ close(wal_dfd);
+
+do_full:
+ if(!bak_begin(true)) return 0;
+
+ pid_t r = fork();
+ if(r < 0) {
+ fprintf(stderr, "fork(): %m\n");
+ abort();
+ }
+ if(r != 0) return 0;
+
+ bak_work();
+ exit(0);
+
+fail:
+ close(out_fd);
+fail_dir:
+ close(wal_dfd);
+ return 1;
+}
+
+int main(int argc, char **argv) {
+ if(argc == 1) goto usage;
+
+ const char *backup_dir = getenv("PGBAK");
+ if(!backup_dir || backup_dir[0] != '/') {
+ fprintf(stderr, "$PGBAK not set to an absolute path\n");
+ return 1;
+ }
+
+ backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(backup_dfd < 0) {
+ fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir);
+ return 1;
+ }
+
+ lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600);
+ if(lck_fd < 0) {
+ fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n");
+ return 1;
+ }
+
+ const char *op = argv[1];
+
+ if(!strcmp(op, "wal")) {
+ if(argc != 3) goto usage;
+ return cmd_wal(argv[2]);
+ }
+
+ if(!strcmp(op, "sync")) {
+ if(argc != 2) goto usage;
+ if(bak_begin(false)) {
+ bak_work();
+ return 0;
+ }
+ return cmd_wait(-1);
+ }
+
+ if(!strcmp(op, "force-sync")) {
+ if(argc != 2) goto usage;
+ if(bak_begin(true)) {
+ bak_work();
+ return 0;
+ }
+ return cmd_wait(-1);
+ }
+
+ if(!strcmp(op, "wait")) {
+ if(argc != 2 && argc != 3) goto usage;
+ long timeout = -1;
+ if(argc > 2) {
+ char *ep = NULL;
+ timeout = strtol(argv[2], &ep, 10);
+ if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage;
+ }
+ return cmd_wait(timeout);
+ }
+
+usage:
+ char *arg0 = argv[0];
+ fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0);
+ return 1;
+}