// SPDX-License-Identifier: LGPL-3.0-or-later // // Copyright 2021 Hristo Venev #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" static int backup_dfd; static int lck_fd; static const char *const LOCK_KIND[] = { // The `state` lock must be held while doing state-related updates, for // example modifying the "archive needed" flag. "state", // The `backup` lock is held by the archiver process. "backup", // The `wait` lock is acquired by the archiver process after the `backup` // lock. It is also acquired by processes that wait for the archiver to // finish. "wait", }; static void lck_release(int pos) { struct flock l = { .l_type = F_UNLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; if(errno == EINTR) continue; fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]); abort(); } } static void lck_wait(int pos) { struct flock l = { .l_type = F_WRLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; if(errno == EINTR) continue; fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); abort(); } } static bool lck_try(int pos) { struct flock l = { .l_type = F_WRLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true; if(errno == EINTR) continue; if(errno == EAGAIN || errno == EACCES) return false; fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); abort(); } } static int lck_read(int pos) { unsigned char b; ssize_t r = pread(lck_fd, &b, 1, pos); if(r < 0) { fprintf(stderr, "Failed to read lock file: %m\n"); abort(); } if(r >= 1) return b; return -1; } static void lck_write(int pos, unsigned char b) { ssize_t r = pwrite(lck_fd, &b, 1, pos); if(r < 0) { fprintf(stderr, "Failed to read lock file: %m\n"); abort(); } if(r == 0) { fprintf(stderr, "Failed to write lock file: Empty write\n"); abort(); } } static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) { pid_t child = fork(); if(child < 0) { fprintf(stderr, "fork(): %m\n"); abort(); } if(child == 0) { if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) _exit(1); if(in_fd >= 0 && dup2(in_fd, 0) < 0) _exit(1); if(out_fd >= 0 && dup2(out_fd, 1) < 0) _exit(1); execvpe(argv[0], (char*const*)argv, environ); _exit(1); } return child; } static int cld_wait(pid_t child, const char *cld) { int status; if(waitpid(child, &status, 0) < 0) { fprintf(stderr, "waitpid(): %m\n"); abort(); } if(WIFEXITED(status)) { int r = WEXITSTATUS(status); if(r) { fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r); } return r; } else if(WIFSIGNALED(status)) { int r = WTERMSIG(status); fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r); return 128 + r; } else { fprintf(stderr, "waitpid(): Unknown status\n"); abort(); } } static int cld_pipe(int in_fd, int out_fd, const char *const *argv) { pid_t child = cld_spawn(in_fd, out_fd, -1, argv); if(in_fd >= 0) close(in_fd); int r = cld_wait(child, argv[0]); if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) { fprintf(stderr, "fsync(): %m\n"); r = -1; } return r; } static int bak_snapshot(char *name) { { struct timespec ts; if(clock_gettime(CLOCK_REALTIME, &ts) < 0) { fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n"); abort(); } sprintf(name, "%lld", (long long)ts.tv_sec); } if(mkdirat(backup_dfd, name, 0700) < 0) { fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name); return -1; } int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(subdfd < 0) { fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name); goto fail_rmdir; } if(mkdirat(subdfd, "pg_wal", 0700) < 0) { fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name); goto fail_close; } int out_fd = -1; out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); if(out_fd < 0) { fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT); goto fail_clean; } (void)unlinkat(backup_dfd, "current", 0); if(symlinkat(name, backup_dfd, "current") < 0) { fprintf(stderr, "Failed to update $PGBAK/current: %m\n"); goto fail; } int pfd[2]; if(pipe(pfd) < 0) { fprintf(stderr, "pipe(): %m\n"); goto fail; } fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT); pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV); close(pfd[1]); if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) { (void)kill(pid_backup, SIGKILL); (void)waitpid(pid_backup, NULL, 0); goto fail; } if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) { goto fail; } char fd_path[32]; sprintf(fd_path, "/proc/self/fd/%d", out_fd); if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) { fprintf(stderr, "Failed to link base backup: %m\n"); goto fail; } close(out_fd); if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); abort(); } return subdfd; fail: close(out_fd); fail_clean: (void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR); fail_close: close(subdfd); fail_rmdir: (void)unlinkat(backup_dfd, name, AT_REMOVEDIR); return -1; } static bool should_pgbasebackup(int subdfd) { struct stat st; if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) { fprintf(stderr, "Failed to stat %s: %m\n", BASE_OUT); return true; } if(!S_ISREG(st.st_mode)) { fprintf(stderr, "Expected regular file at %s\n", BASE_OUT); return true; } off_t base_size = st.st_size; off_t wal_size = 0; off_t wal_count = 0; int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(wal_dfd < 0) { fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n"); return true; } DIR *wal_dir = fdopendir(wal_dfd); if(!wal_dir) { fprintf(stderr, "fdopendir(): %m\n"); abort(); } while(true) { errno = 0; struct dirent *ent = readdir(wal_dir); if(!ent) { if(errno == 0) break; fprintf(stderr, "readdir(): %m\n"); abort(); } if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue; wal_count += 1; if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue; wal_size += st.st_size; } closedir(wal_dir); return wal_count > 8 && (wal_size >> 1) >= base_size; } // Should we become an archiver? If so, acquire `backup` and `wait` locks. static bool bak_begin(bool force) { bool r = false; lck_wait(0); if(!lck_try(1)) { if(force) lck_write(0, 2); goto end; } if(!force && lck_read(0) <= 0) { lck_release(1); goto end; } lck_wait(2); r = true; end: lck_release(0); return r; } // Archiver static void bak_work(void) { unsigned int backoff = 4 * 512; char name[32]; char ts_s[32]; const char *const cmd[] = { "../scripts/backup", name, ts_s, NULL, }; int subdfd = -1; { ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name)); if(r < 0) { if(errno != ENOENT) { fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n"); exit(1); } } else if((size_t)r >= sizeof(name)) { fprintf(stderr, "Symlink $PGBAK/current target too long\n"); exit(1); } else { name[r] = 0; subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); } } bool want_ts = true; while(true) { lck_write(0, 1); bool want_snapshot = false; const char *failed = NULL; if(subdfd >= 0) { if(want_ts) { struct timespec ts; if(clock_gettime(CLOCK_REALTIME, &ts) < 0) { fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n"); abort(); } sprintf(ts_s, "%lld", (long long)ts.tv_sec); want_ts = false; } pid_t child = cld_spawn(-1, -1, subdfd, cmd); if(cld_wait(child, cmd[0])) { failed = "backup script"; } else { fprintf(stderr, "Finished backup %s-%s\n", name, ts_s); } if(should_pgbasebackup(subdfd)) { close(subdfd); want_snapshot = true; } } else { want_snapshot = true; } if(want_snapshot) { subdfd = bak_snapshot(name); if(subdfd < 0) { failed = "pg_basebackup"; } } lck_wait(0); int r = lck_read(0); if(r < 0) { fprintf(stderr, "Lock file truncated: %m\n"); abort(); } if(r > 1) { want_ts = true; } else if(!failed) { lck_write(0, 0); break; } lck_release(0); if(failed) { backoff += backoff >> 2; if(backoff > 153600) backoff = 153600; unsigned rnd; (void)getentropy(&rnd, sizeof(rnd)); backoff += rnd & 1023; struct timespec tv = { .tv_sec = backoff / 512, .tv_nsec = backoff % 512 * 1953125, }; fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec); (void)nanosleep(&tv, NULL); } else { backoff = 4 * 512; } } } static void on_sigalrm(int) { _Exit(1); } static int cmd_wait(long timeout) { if(timeout == 0) { if(lck_try(2)) return 0; return 1; } if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) { signal(SIGALRM, on_sigalrm); alarm((unsigned)timeout); } lck_wait(2); return 0; } static int cmd_wal(const char *name) { size_t name_len = strlen(name); if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) { fprintf(stderr, "Invalid WAL file name '%s'\n", name); return 1; } int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(wal_dfd < 0) { if(errno == ENOENT) goto do_full; fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n"); return 1; } char name_out[256]; memcpy(name_out, name + 7, name_len - 7); memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1); if(faccessat(wal_dfd, name_out, R_OK, AT_SYMLINK_NOFOLLOW) >= 0) { fprintf(stderr, "WAL backup exists at $PGBAK/current/%s, skipping\n", name_out); return 0; } int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); if(out_fd < 0) { fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n"); goto fail_dir; } int in_fd = open(name, O_RDONLY | O_CLOEXEC); if(in_fd < 0) { fprintf(stderr, "Failed to open WAL file %s: %m\n", name); goto fail; } fprintf(stderr, "Archiving %s\n", name); if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) { fprintf(stderr, "Failed to create WAL backup\n"); goto fail; } char fd_path[32]; sprintf(fd_path, "/proc/self/fd/%d", out_fd); if(linkat(AT_FDCWD, fd_path, wal_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) { fprintf(stderr, "Failed to link WAL backup: %m\n"); goto fail; } close(out_fd); if(fsync(wal_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); goto fail_dir; } close(wal_dfd); do_full: if(!bak_begin(true)) return 0; pid_t r = fork(); if(r < 0) { fprintf(stderr, "fork(): %m\n"); abort(); } if(r != 0) return 0; bak_work(); exit(0); fail: close(out_fd); fail_dir: close(wal_dfd); return 1; } int main(int argc, char **argv) { if(argc == 1) goto usage; const char *backup_dir = getenv("PGBAK"); if(!backup_dir || backup_dir[0] != '/') { fprintf(stderr, "$PGBAK not set to an absolute path\n"); return 1; } backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(backup_dfd < 0) { fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir); return 1; } lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600); if(lck_fd < 0) { fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n"); return 1; } const char *op = argv[1]; if(!strcmp(op, "wal")) { if(argc != 3) goto usage; return cmd_wal(argv[2]); } if(!strcmp(op, "sync")) { if(argc != 2) goto usage; if(bak_begin(false)) { bak_work(); return 0; } return cmd_wait(-1); } if(!strcmp(op, "force-sync")) { if(argc != 2) goto usage; if(bak_begin(true)) { bak_work(); return 0; } return cmd_wait(-1); } if(!strcmp(op, "wait")) { if(argc != 2 && argc != 3) goto usage; long timeout = -1; if(argc > 2) { char *ep = NULL; timeout = strtol(argv[2], &ep, 10); if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage; } return cmd_wait(timeout); } usage: char *arg0 = argv[0]; fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0); return 1; }