// SPDX-License-Identifier: LGPL-3.0-or-later // // Copyright 2021-2023 Hristo Venev #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" static int backup_dfd; static int lck_fd; // ===== Lock indices ===== // LOCK_STATE must be held while doing state-related updates, for example // modifying the "archive needed" flag. #define LOCK_STATE 0 // LOCK_BACKUP is held by the archiver process. It is try-acquired with // the state lock held, and then the state lock is released. #define LOCK_BACKUP 1 // LOCK_WAIT is acquired by the archiver process after successfully acquiring // the backup` // lock. It is also acquired by processes that wait for the archiver to // finish. #define LOCK_WAIT 2 static const char *const LOCK_KIND[] = { [LOCK_STATE] = "state", [LOCK_BACKUP] = "backup", [LOCK_WAIT] = "wait", }; // ===== State flags ===== #define STATE_MASK 0x7 // We need to run the backup script. #define S_WANT_ARCHIVE 0x08 // A "next" full snapshot should be made unless one is in progress. #define S_WANT_SNAPSHOT 0x10 // A "next" full snapshot needs to be made. #define S_WANT_NEW_SNAPSHOT 0x20 // Done. #define STATE_IDLE 0x0 // Run the backup script, goto STATE_IDLE #define STATE_ARCHIVE 0x1 // Rename next->current, goto STATE_ARCHIVE #define STATE_PROMOTE 0x2 // Run the backup script, goto STATE_PROMOTE #define STATE_PREARCHIVE 0x3 // Take a new snapshot, goto STATE_PREARCHIVE #define STATE_SNAPSHOT 0x4 static void lck_release(int pos) { struct flock l = { .l_type = F_UNLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; if(errno == EINTR) continue; fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]); abort(); } } static void lck_wait(int pos) { struct flock l = { .l_type = F_WRLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return; if(errno == EINTR) continue; fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); abort(); } } static bool lck_try(int pos) { struct flock l = { .l_type = F_WRLCK, .l_whence = SEEK_SET, .l_start = pos, .l_len = 1, }; while(true) { if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true; switch(errno) { case EAGAIN: case EACCES: return false; case EINTR: break; default: fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]); abort(); } } } static int lck_read(void) { unsigned char b; ssize_t r = pread(lck_fd, &b, 1, 0); if(r < 0) { fprintf(stderr, "Failed to read lock file: %m\n"); abort(); } if(r == 0) return 0; return b; } static void lck_write(int v) { unsigned char b = (unsigned char)v; ssize_t r = pwrite(lck_fd, &b, 1, 0); if(r < 0) { fprintf(stderr, "Failed to read lock file: %m\n"); abort(); } if(r == 0) { fprintf(stderr, "Failed to write lock file: Empty write\n"); abort(); } } static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) { pid_t child = fork(); if(child < 0) { fprintf(stderr, "fork(): %m\n"); abort(); } if(child == 0) { if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) { perror("fchdir"); _Exit(1); } if(in_fd >= 0 && dup2(in_fd, 0) < 0) { perror("dup2"); _Exit(1); } if(out_fd >= 0 && dup2(out_fd, 1) < 0) { perror("dup2"); _Exit(1); } execvpe(argv[0], (char*const*)argv, environ); perror("execve"); _Exit(1); } return child; } static int cld_wait(pid_t child, const char *cld) { int status; if(waitpid(child, &status, 0) < 0) { fprintf(stderr, "waitpid(): %m\n"); abort(); } if(WIFEXITED(status)) { int r = WEXITSTATUS(status); if(r) { fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r); } return r; } else if(WIFSIGNALED(status)) { int r = WTERMSIG(status); fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r); return 128 + r; } else { fprintf(stderr, "waitpid(): Unknown status\n"); abort(); } } static int cld_pipe(int in_fd, int out_fd, const char *const *argv) { pid_t child = cld_spawn(in_fd, out_fd, -1, argv); if(in_fd >= 0) close(in_fd); int r = cld_wait(child, argv[0]); if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) { fprintf(stderr, "fsync(): %m\n"); r = -1; } return r; } static int bak_snapshot(char *name) { if(mkdirat(backup_dfd, name, 0700) < 0) { fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name); return -1; } int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(subdfd < 0) { fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name); goto fail_rmdir; } if(mkdirat(subdfd, "pg_wal", 0700) < 0) { fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name); goto fail_close; } int out_fd = -1; out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); if(out_fd < 0) { fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT); goto fail_clean; } (void)unlinkat(backup_dfd, "next", 0); if(symlinkat(name, backup_dfd, "next") < 0) { fprintf(stderr, "Failed to update $PGBAK/next: %m\n"); goto fail; } int pfd[2]; if(pipe(pfd) < 0) { fprintf(stderr, "pipe(): %m\n"); goto fail; } fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT); pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV); close(pfd[1]); if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) { (void)kill(pid_backup, SIGKILL); (void)waitpid(pid_backup, NULL, 0); goto fail; } if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) { goto fail; } char fd_path[32]; sprintf(fd_path, "/proc/self/fd/%d", out_fd); if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) { fprintf(stderr, "Failed to link base backup: %m\n"); goto fail; } close(out_fd); if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); abort(); } close(subdfd); return 0; fail: (void)unlinkat(backup_dfd, "next", 0); close(out_fd); fail_clean: (void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR); fail_close: close(subdfd); fail_rmdir: (void)unlinkat(backup_dfd, name, AT_REMOVEDIR); return -1; } static bool should_pgbasebackup(int subdfd) { struct stat st; if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) { fprintf(stderr, "Failed to stat %s: %m; creating new snapshot\n", BASE_OUT); return true; } if(!S_ISREG(st.st_mode)) { fprintf(stderr, "Expected regular file at %s; creating new snapshot\n", BASE_OUT); return true; } off_t base_size = st.st_size; off_t wal_size = 0; off_t wal_count = 0; int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(wal_dfd < 0) { fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m; creating new snapshot\n"); return true; } DIR *wal_dir = fdopendir(wal_dfd); if(!wal_dir) { fprintf(stderr, "fdopendir(): %m\n"); abort(); } while(true) { errno = 0; struct dirent *ent = readdir(wal_dir); if(!ent) { if(errno == 0) break; fprintf(stderr, "readdir(): %m\n"); abort(); } if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue; wal_count += 1; if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue; wal_size += st.st_size; } closedir(wal_dir); if(wal_count > 8 && (wal_size >> 1) >= base_size) { fprintf(stderr, "WAL directroy too big; creating new snapshot\n"); return true; } return false; } static int bak_open_current(char *name, size_t namebuf) { ssize_t r = readlinkat(backup_dfd, "current", name, namebuf); if(r < 0) { if(errno != ENOENT) { fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n"); return -1; } fprintf(stderr, "Failed to open() $PGBAK/current: %m\n"); return -1; } if((size_t)r >= namebuf) { fprintf(stderr, "Symlink $PGBAK/current target too long\n"); return -1; } name[r] = 0; int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(subdfd < 0) { fprintf(stderr, "Failed to open() $PGBAK/%s: %m\n", name); } return subdfd; } // Archiver. Called in a subprocess with LOCK_STATE. static void bak_work(int state) { unsigned int backoff = 4 * 512; char cur_name[32]; char ts_name[32]; const char *archive_cmd[] = { "../scripts/backup", cur_name, ts_name, NULL, NULL, }; lck_wait(LOCK_WAIT); bool did_snapshot = false; bool refresh_ts = true; int flags = state; state &= STATE_MASK; // Skip reopening if a full snapshot was requested. while(true) { switch(state) { case STATE_IDLE: if(flags & S_WANT_ARCHIVE) state = STATE_ARCHIVE; [[fallthrough]]; case STATE_ARCHIVE: if(flags & S_WANT_SNAPSHOT) state = STATE_SNAPSHOT; [[fallthrough]]; case STATE_PROMOTE: case STATE_PREARCHIVE: if(flags & S_WANT_NEW_SNAPSHOT) state = STATE_SNAPSHOT; break; default: state = STATE_SNAPSHOT; } lck_write(state); if(state == STATE_IDLE) break; lck_release(LOCK_STATE); if(state == STATE_SNAPSHOT || refresh_ts) { refresh_ts = false; struct timespec ts; if(clock_gettime(CLOCK_REALTIME, &ts) < 0) { fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n"); abort(); } sprintf(ts_name, "%lld", (long long)ts.tv_sec); } const char *failed = NULL; switch(state) { case STATE_SNAPSHOT: if(bak_snapshot(ts_name) < 0) { failed = "pg_basebackup"; break; } fprintf(stderr, "Finished snapshot %s\n", ts_name); did_snapshot = true; state = STATE_PREARCHIVE; break; case STATE_PROMOTE: if(renameat(backup_dfd, "next", backup_dfd, "current") < 0) { fprintf(stderr, "Failed to promote $PGBAK/next to current: %m\n"); state = STATE_SNAPSHOT; break; } if(fsync(backup_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); abort(); } state = STATE_ARCHIVE; break; case STATE_ARCHIVE: case STATE_PREARCHIVE: { int subdfd = bak_open_current(cur_name, sizeof(cur_name)); if(subdfd < 0) { if(state == STATE_ARCHIVE) { state = STATE_SNAPSHOT; } else { state = STATE_PROMOTE; } break; } archive_cmd[3] = (state == STATE_PREARCHIVE) ? "old" : NULL; pid_t child = cld_spawn(-1, -1, subdfd, archive_cmd); if(cld_wait(child, archive_cmd[0])) { close(subdfd); failed = "backup script"; break; } fprintf(stderr, "Finished archive %s-%s\n", cur_name, ts_name); if(state == STATE_PREARCHIVE) { state = STATE_PROMOTE; } else if(!did_snapshot && should_pgbasebackup(subdfd)) { state = STATE_SNAPSHOT; } else { state = STATE_IDLE; } close(subdfd); break; } } if(failed) { backoff += backoff >> 2; if(backoff > 153600) backoff = 153600; unsigned rnd; (void)getentropy(&rnd, sizeof(rnd)); backoff += rnd & 1023; struct timespec tv = { .tv_sec = backoff / 512, .tv_nsec = backoff % 512 * 1953125, }; fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec); (void)nanosleep(&tv, NULL); } else { backoff = 4 * 512; } lck_wait(LOCK_STATE); flags = lck_read(); } } // Maybe start an archiver. If an archiver is already running, return -1. // Otherwise return state flags (0 if nothing needs to be done). static int bak_begin(int flags) { lck_wait(LOCK_STATE); int state = lck_read(); state |= flags; if(!lck_try(LOCK_BACKUP)) { if(flags & ~state) { lck_write(state); } lck_release(LOCK_STATE); return -1; } // We got the backup lock. If anything was running, it must have crashed. // Mark it as pending. switch(state & STATE_MASK) { case STATE_SNAPSHOT: fprintf(stderr, "Resuming interrupted snapshot\n"); break; case STATE_PROMOTE: case STATE_ARCHIVE: fprintf(stderr, "Resuming interrupted backup\n"); break; } return state; } static void on_sigalrm(int) { _Exit(1); } static int cmd_wait(long timeout) { if(timeout == 0) { if(lck_try(LOCK_WAIT)) return 0; return 1; } if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) { signal(SIGALRM, on_sigalrm); alarm((unsigned)timeout); } lck_wait(LOCK_WAIT); return 0; } static int cmd_wal(const char *name) { size_t name_len = strlen(name); if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) { fprintf(stderr, "Invalid WAL file name '%s'\n", name); return 1; } int flags = S_WANT_ARCHIVE; // current is renamed to next, so by opening next first, we ensure that they // are distinct. int next_dfd = openat(backup_dfd, "next/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(next_dfd < 0 && errno != ENOENT) { fprintf(stderr, "Failed to open $PGBAK/next/pg_wal: %m; will force new snapshot\n"); flags |= S_WANT_NEW_SNAPSHOT; } int cur_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(cur_dfd < 0) { if(errno != ENOENT) { fprintf(stderr, "Failed to open directory $PGBAK/current/pg_wal: %m\n"); return 1; } fprintf(stderr, "Directory $PGBAK/current/pg_wal does not exist; will request new snapshot\n"); if(next_dfd < 0) { flags = S_WANT_SNAPSHOT; } } if(cur_dfd < 0 && next_dfd < 0) { fprintf(stderr, "Skipping %s\n", name); goto skip; } char name_out[256]; memcpy(name_out, name + 7, name_len - 7); memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1); if( (cur_dfd >= 0 && faccessat(cur_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) || (next_dfd >= 0 && faccessat(next_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) ) { // Assume we've gone back in WAL history fprintf(stderr, "Backup of %s already exists; forcing snapshot\n", name); if(cur_dfd >= 0) close(cur_dfd); if(next_dfd >= 0) close(next_dfd); flags = S_WANT_NEW_SNAPSHOT; goto skip; } int out_fd = openat(backup_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600); if(out_fd < 0) { fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n"); goto fail_dir; } int in_fd = open(name, O_RDONLY | O_CLOEXEC); if(in_fd < 0) { fprintf(stderr, "Failed to open WAL file %s: %m\n", name); goto fail; } fprintf(stderr, "Archiving %s\n", name); if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) { fprintf(stderr, "Failed to create WAL backup\n"); goto fail; } char fd_path[32]; sprintf(fd_path, "/proc/self/fd/%d", out_fd); if(cur_dfd >= 0 && linkat(AT_FDCWD, fd_path, cur_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) { fprintf(stderr, "Failed to link WAL backup: %m\n"); goto fail; } if(next_dfd >= 0 && linkat(AT_FDCWD, fd_path, next_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) { fprintf(stderr, "Failed to link WAL backup: %m\n"); goto fail; } close(out_fd); if(cur_dfd >= 0 && fsync(cur_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); goto fail_dir; } if(next_dfd >= 0 && fsync(next_dfd) < 0) { fprintf(stderr, "fsync(): %m\n"); goto fail_dir; } close(cur_dfd); skip: flags = bak_begin(flags); if(flags > 0) { pid_t r = fork(); if(r < 0) { fprintf(stderr, "fork(): %m\n"); abort(); } if(r == 0) { bak_work(flags); _Exit(0); } } return 0; fail: close(out_fd); fail_dir: if(cur_dfd >= 0) close(cur_dfd); if(next_dfd >= 0) close(next_dfd); return 1; } static int cmd_sync(bool force_archive, bool force_snapshot) { int flags = 0; if(force_archive) flags |= S_WANT_ARCHIVE; if(force_snapshot) flags |= S_WANT_NEW_SNAPSHOT; flags = bak_begin(flags); if(flags < 0) { return cmd_wait(-1); } bak_work(flags); return 0; } int main(int argc, char **argv) { if(argc == 1) goto usage; const char *backup_dir = getenv("PGBAK"); if(!backup_dir || backup_dir[0] != '/') { fprintf(stderr, "$PGBAK not set to an absolute path\n"); return 1; } const char *op = argv[1]; bool is_try_wait = !strcmp(op, "try-wait"); backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(backup_dfd < 0) { if(is_try_wait && errno == ENOENT) { return 0; } fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir); return 1; } int lck_flags = O_RDWR | (is_try_wait ? 0 : O_CREAT) | O_CLOEXEC; lck_fd = openat(backup_dfd, "pgbak.lock", lck_flags, 0600); if(lck_fd < 0) { if(is_try_wait && errno == ENOENT) { return 0; } fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n"); return 1; } if(!strcmp(op, "wal")) { if(argc != 3) goto usage; return cmd_wal(argv[2]); } if(!strcmp(op, "sync")) { if(argc != 2) goto usage; return cmd_sync(false, false); } if(!strcmp(op, "force-sync")) { if(argc != 2) goto usage; return cmd_sync(true, false); } if(!strcmp(op, "full-sync")) { if(argc != 2) goto usage; return cmd_sync(true, true); } if(is_try_wait || !strcmp(op, "wait")) { if(argc != 2 && argc != 3) goto usage; long timeout = -1; if(argc > 2) { char *ep = NULL; timeout = strtol(argv[2], &ep, 10); if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage; } return cmd_wait(timeout); } usage: char *arg0 = argv[0]; fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0); return 1; }