diff options
Diffstat (limited to 'pgbak.c')
-rw-r--r-- | pgbak.c | 569 |
1 files changed, 569 insertions, 0 deletions
@@ -0,0 +1,569 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// +// Copyright 2021 Hristo Venev + +#define _GNU_SOURCE +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/file.h> +#include <sys/stat.h> +#include <sys/wait.h> +#include <time.h> +#include <unistd.h> + +#include "config.h" + +static int backup_dfd; +static int lck_fd; + + +static const char *const LOCK_KIND[] = { + // The `state` lock must be held while doing state-related updates, for + // example modifying the "archive needed" flag. + "state", + + // The `backup` lock is held by the archiver process. + "backup", + + // The `wait` lock is acquired by the archiver process after the `backup` + // lock. It is also acquired by processes that wait for the archiver to + // finish. + "wait", +}; + +static void lck_release(int pos) { + struct flock l = { + .l_type = F_UNLCK, + .l_whence = SEEK_SET, + .l_start = pos, + .l_len = 1, + }; + while(true) { + if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; + if(errno == EINTR) continue; + fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]); + abort(); + } +} + +static void lck_wait(int pos) { + struct flock l = { + .l_type = F_WRLCK, + .l_whence = SEEK_SET, + .l_start = pos, + .l_len = 1, + }; + while(true) { + if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; + if(errno == EINTR) continue; + fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); + abort(); + } +} + +static bool lck_try(int pos) { + struct flock l = { + .l_type = F_WRLCK, + .l_whence = SEEK_SET, + .l_start = pos, + .l_len = 1, + }; + while(true) { + if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true; + if(errno == EINTR) continue; + if(errno == EAGAIN || errno == EACCES) return false; + fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); + abort(); + } +} + +static int lck_read(int pos) { + unsigned char b; + ssize_t r = pread(lck_fd, &b, 1, pos); + if(r < 0) { + fprintf(stderr, "Failed to read lock file: %m\n"); + abort(); + } + if(r >= 1) return b; + return -1; +} + +static void lck_write(int pos, unsigned char b) { + ssize_t r = pwrite(lck_fd, &b, 1, pos); + if(r < 0) { + fprintf(stderr, "Failed to read lock file: %m\n"); + abort(); + } + if(r == 0) { + fprintf(stderr, "Failed to write lock file: Empty write\n"); + abort(); + } +} + +static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) { + pid_t child = fork(); + if(child < 0) { + fprintf(stderr, "fork(): %m\n"); + abort(); + } + if(child == 0) { + if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) _exit(1); + if(in_fd >= 0 && dup2(in_fd, 0) < 0) _exit(1); + if(out_fd >= 0 && dup2(out_fd, 1) < 0) _exit(1); + execvpe(argv[0], (char*const*)argv, environ); + _exit(1); + } + return child; +} + +static int cld_wait(pid_t child, const char *cld) { + int status; + if(waitpid(child, &status, 0) < 0) { + fprintf(stderr, "waitpid(): %m\n"); + abort(); + } + + if(WIFEXITED(status)) { + int r = WEXITSTATUS(status); + if(r) { + fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r); + } + return r; + } else if(WIFSIGNALED(status)) { + int r = WTERMSIG(status); + fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r); + return 128 + r; + } else { + fprintf(stderr, "waitpid(): Unknown status\n"); + abort(); + } +} + +static int cld_pipe(int in_fd, int out_fd, const char *const *argv) { + pid_t child = cld_spawn(in_fd, out_fd, -1, argv); + if(in_fd >= 0) close(in_fd); + + int r = cld_wait(child, argv[0]); + if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) { + fprintf(stderr, "fsync(): %m\n"); + r = -1; + } + return r; +} + + +static int bak_snapshot(char *name) { + { + struct timespec ts; + if(clock_gettime(CLOCK_REALTIME, &ts) < 0) { + fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n"); + abort(); + } + sprintf(name, "%lld", (long long)ts.tv_sec); + } + + if(mkdirat(backup_dfd, name, 0700) < 0) { + fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name); + return -1; + } + + int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(subdfd < 0) { + fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name); + goto fail_rmdir; + } + + if(mkdirat(subdfd, "pg_wal", 0700) < 0) { + fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name); + goto fail_close; + } + + int out_fd = -1; + out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); + if(out_fd < 0) { + fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT); + goto fail_clean; + } + + (void)unlinkat(backup_dfd, "current", 0); + if(symlinkat(name, backup_dfd, "current") < 0) { + fprintf(stderr, "Failed to update $PGBAK/current: %m\n"); + goto fail; + } + + int pfd[2]; + if(pipe(pfd) < 0) { + fprintf(stderr, "pipe(): %m\n"); + goto fail; + } + + fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT); + + pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV); + close(pfd[1]); + + if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) { + (void)kill(pid_backup, SIGKILL); + (void)waitpid(pid_backup, NULL, 0); + goto fail; + } + if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) { + goto fail; + } + + char fd_path[32]; + sprintf(fd_path, "/proc/self/fd/%d", out_fd); + if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) { + fprintf(stderr, "Failed to link base backup: %m\n"); + goto fail; + } + + close(out_fd); + if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) { + fprintf(stderr, "fsync(): %m\n"); + abort(); + } + return subdfd; + +fail: + close(out_fd); +fail_clean: + (void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR); +fail_close: + close(subdfd); +fail_rmdir: + (void)unlinkat(backup_dfd, name, AT_REMOVEDIR); + return -1; +} + +static bool should_pgbasebackup(int subdfd) { + struct stat st; + + if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) { + fprintf(stderr, "Failed to stat %s: %m\n", BASE_OUT); + return true; + } + if(!S_ISREG(st.st_mode)) { + fprintf(stderr, "Expected regular file at %s\n", BASE_OUT); + return true; + } + + off_t base_size = st.st_size; + off_t wal_size = 0; + off_t wal_count = 0; + + int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(wal_dfd < 0) { + fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n"); + return true; + } + + DIR *wal_dir = fdopendir(wal_dfd); + if(!wal_dir) { + fprintf(stderr, "fdopendir(): %m\n"); + abort(); + } + + while(true) { + errno = 0; + struct dirent *ent = readdir(wal_dir); + if(!ent) { + if(errno == 0) break; + fprintf(stderr, "readdir(): %m\n"); + abort(); + } + if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue; + + wal_count += 1; + if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue; + wal_size += st.st_size; + } + closedir(wal_dir); + + return wal_count > 8 && (wal_size >> 1) >= base_size; +} + +// Should we become an archiver? If so, acquire `backup` and `wait` locks. +static bool bak_begin(bool force) { + bool r = false; + lck_wait(0); + + if(!lck_try(1)) { + if(force) lck_write(0, 2); + goto end; + } + + if(!force && lck_read(0) <= 0) { + lck_release(1); + goto end; + } + + lck_wait(2); + r = true; + +end: + lck_release(0); + return r; +} + +// Archiver +static void bak_work(void) { + unsigned int backoff = 4 * 512; + char name[32]; + char ts_s[32]; + const char *const cmd[] = { + "../scripts/backup", name, ts_s, NULL, + }; + + int subdfd = -1; + + { + ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name)); + if(r < 0) { + if(errno != ENOENT) { + fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n"); + exit(1); + } + } else if((size_t)r >= sizeof(name)) { + fprintf(stderr, "Symlink $PGBAK/current target too long\n"); + exit(1); + } else { + name[r] = 0; + subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); + } + } + + bool want_ts = true; + while(true) { + lck_write(0, 1); + + bool want_snapshot = false; + const char *failed = NULL; + if(subdfd >= 0) { + if(want_ts) { + struct timespec ts; + if(clock_gettime(CLOCK_REALTIME, &ts) < 0) { + fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n"); + abort(); + } + sprintf(ts_s, "%lld", (long long)ts.tv_sec); + want_ts = false; + } + + pid_t child = cld_spawn(-1, -1, subdfd, cmd); + if(cld_wait(child, cmd[0])) { + failed = "backup script"; + } else { + fprintf(stderr, "Finished backup %s-%s\n", name, ts_s); + } + + if(should_pgbasebackup(subdfd)) { + close(subdfd); + want_snapshot = true; + } + } else { + want_snapshot = true; + } + + if(want_snapshot) { + subdfd = bak_snapshot(name); + if(subdfd < 0) { + failed = "pg_basebackup"; + } + } + + lck_wait(0); + int r = lck_read(0); + if(r < 0) { + fprintf(stderr, "Lock file truncated: %m\n"); + abort(); + } + if(r > 1) { + want_ts = true; + } else if(!failed) { + lck_write(0, 0); + break; + } + lck_release(0); + + if(failed) { + backoff += backoff >> 2; + if(backoff > 153600) backoff = 153600; + unsigned rnd; + (void)getentropy(&rnd, sizeof(rnd)); + backoff += rnd & 1023; + + struct timespec tv = { + .tv_sec = backoff / 512, + .tv_nsec = backoff % 512 * 1953125, + }; + fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec); + (void)nanosleep(&tv, NULL); + } else { + backoff = 4 * 512; + } + } +} + +static void on_sigalrm(int) { + _Exit(1); +} + +static int cmd_wait(long timeout) { + if(timeout == 0) { + if(lck_try(2)) return 0; + return 1; + } + if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) { + signal(SIGALRM, on_sigalrm); + alarm((unsigned)timeout); + } + lck_wait(2); + return 0; +} + +static int cmd_wal(const char *name) { + size_t name_len = strlen(name); + if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) { + fprintf(stderr, "Invalid WAL file name '%s'\n", name); + return 1; + } + + int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(wal_dfd < 0) { + if(errno == ENOENT) goto do_full; + fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n"); + return 1; + } + + char name_out[256]; + memcpy(name_out, name + 7, name_len - 7); + memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1); + + if(faccessat(wal_dfd, name_out, R_OK, AT_SYMLINK_NOFOLLOW) >= 0) { + fprintf(stderr, "WAL backup exists at $PGBAK/current/%s, skipping\n", name_out); + return 0; + } + + int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); + if(out_fd < 0) { + fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n"); + goto fail_dir; + } + + int in_fd = open(name, O_RDONLY | O_CLOEXEC); + if(in_fd < 0) { + fprintf(stderr, "Failed to open WAL file %s: %m\n", name); + goto fail; + } + + fprintf(stderr, "Archiving %s\n", name); + + if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) { + fprintf(stderr, "Failed to create WAL backup\n"); + goto fail; + } + + char fd_path[32]; + sprintf(fd_path, "/proc/self/fd/%d", out_fd); + if(linkat(AT_FDCWD, fd_path, wal_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) { + fprintf(stderr, "Failed to link WAL backup: %m\n"); + goto fail; + } + close(out_fd); + + if(fsync(wal_dfd) < 0) { + fprintf(stderr, "fsync(): %m\n"); + goto fail_dir; + } + close(wal_dfd); + +do_full: + if(!bak_begin(true)) return 0; + + pid_t r = fork(); + if(r < 0) { + fprintf(stderr, "fork(): %m\n"); + abort(); + } + if(r != 0) return 0; + + bak_work(); + exit(0); + +fail: + close(out_fd); +fail_dir: + close(wal_dfd); + return 1; +} + +int main(int argc, char **argv) { + if(argc == 1) goto usage; + + const char *backup_dir = getenv("PGBAK"); + if(!backup_dir || backup_dir[0] != '/') { + fprintf(stderr, "$PGBAK not set to an absolute path\n"); + return 1; + } + + backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(backup_dfd < 0) { + fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir); + return 1; + } + + lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600); + if(lck_fd < 0) { + fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n"); + return 1; + } + + const char *op = argv[1]; + + if(!strcmp(op, "wal")) { + if(argc != 3) goto usage; + return cmd_wal(argv[2]); + } + + if(!strcmp(op, "sync")) { + if(argc != 2) goto usage; + if(bak_begin(false)) { + bak_work(); + return 0; + } + return cmd_wait(-1); + } + + if(!strcmp(op, "force-sync")) { + if(argc != 2) goto usage; + if(bak_begin(true)) { + bak_work(); + return 0; + } + return cmd_wait(-1); + } + + if(!strcmp(op, "wait")) { + if(argc != 2 && argc != 3) goto usage; + long timeout = -1; + if(argc > 2) { + char *ep = NULL; + timeout = strtol(argv[2], &ep, 10); + if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage; + } + return cmd_wait(timeout); + } + +usage: + char *arg0 = argv[0]; + fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0); + return 1; +} |