// SPDX-License-Identifier: LGPL-3.0-or-later
//
// Copyright 2021-2023 Hristo Venev
#define _GNU_SOURCE
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include "config.h"
static int backup_dfd;
static int lck_fd;
// ===== Lock indices =====
// LOCK_STATE must be held while doing state-related updates, for example
// modifying the "archive needed" flag.
#define LOCK_STATE 0
// LOCK_BACKUP is held by the archiver process. It is try-acquired with
// the state lock held, and then the state lock is released.
#define LOCK_BACKUP 1
// LOCK_WAIT is acquired by the archiver process after successfully acquiring
// the backup`
// lock. It is also acquired by processes that wait for the archiver to
// finish.
#define LOCK_WAIT 2
static const char *const LOCK_KIND[] = {
[LOCK_STATE] = "state",
[LOCK_BACKUP] = "backup",
[LOCK_WAIT] = "wait",
};
// ===== State flags =====
#define STATE_MASK 0x7
// We need to run the backup script.
#define S_WANT_ARCHIVE 0x08
// A "next" full snapshot should be made unless one is in progress.
#define S_WANT_SNAPSHOT 0x10
// A "next" full snapshot needs to be made.
#define S_WANT_NEW_SNAPSHOT 0x20
// Done.
#define STATE_IDLE 0x0
// Run the backup script, goto STATE_IDLE
#define STATE_ARCHIVE 0x1
// Rename next->current, goto STATE_ARCHIVE
#define STATE_PROMOTE 0x2
// Run the backup script, goto STATE_PROMOTE
#define STATE_PREARCHIVE 0x3
// Take a new snapshot, goto STATE_PREARCHIVE
#define STATE_SNAPSHOT 0x4
static void lck_release(int pos) {
struct flock l = {
.l_type = F_UNLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
if(errno == EINTR) continue;
fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
static void lck_wait(int pos) {
struct flock l = {
.l_type = F_WRLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
if(errno == EINTR) continue;
fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
static bool lck_try(int pos) {
struct flock l = {
.l_type = F_WRLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true;
switch(errno) {
case EAGAIN:
case EACCES:
return false;
case EINTR:
break;
default:
fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
}
static int lck_read(void) {
unsigned char b;
ssize_t r = pread(lck_fd, &b, 1, 0);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
}
if(r == 0) return 0;
return b;
}
static void lck_write(int v) {
unsigned char b = (unsigned char)v;
ssize_t r = pwrite(lck_fd, &b, 1, 0);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
}
if(r == 0) {
fprintf(stderr, "Failed to write lock file: Empty write\n");
abort();
}
}
static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) {
pid_t child = fork();
if(child < 0) {
fprintf(stderr, "fork(): %m\n");
abort();
}
if(child == 0) {
if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) {
perror("fchdir");
_Exit(1);
}
if(in_fd >= 0 && dup2(in_fd, 0) < 0) {
perror("dup2");
_Exit(1);
}
if(out_fd >= 0 && dup2(out_fd, 1) < 0) {
perror("dup2");
_Exit(1);
}
execvpe(argv[0], (char*const*)argv, environ);
perror("execve");
_Exit(1);
}
return child;
}
static int cld_wait(pid_t child, const char *cld) {
int status;
if(waitpid(child, &status, 0) < 0) {
fprintf(stderr, "waitpid(): %m\n");
abort();
}
if(WIFEXITED(status)) {
int r = WEXITSTATUS(status);
if(r) {
fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r);
}
return r;
} else if(WIFSIGNALED(status)) {
int r = WTERMSIG(status);
fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r);
return 128 + r;
} else {
fprintf(stderr, "waitpid(): Unknown status\n");
abort();
}
}
static int cld_pipe(int in_fd, int out_fd, const char *const *argv) {
pid_t child = cld_spawn(in_fd, out_fd, -1, argv);
if(in_fd >= 0) close(in_fd);
int r = cld_wait(child, argv[0]);
if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) {
fprintf(stderr, "fsync(): %m\n");
r = -1;
}
return r;
}
static int bak_snapshot(char *name) {
if(mkdirat(backup_dfd, name, 0700) < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name);
return -1;
}
int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(subdfd < 0) {
fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name);
goto fail_rmdir;
}
if(mkdirat(subdfd, "pg_wal", 0700) < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name);
goto fail_close;
}
int out_fd = -1;
out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
if(out_fd < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT);
goto fail_clean;
}
(void)unlinkat(backup_dfd, "next", 0);
if(symlinkat(name, backup_dfd, "next") < 0) {
fprintf(stderr, "Failed to update $PGBAK/next: %m\n");
goto fail;
}
int pfd[2];
if(pipe(pfd) < 0) {
fprintf(stderr, "pipe(): %m\n");
goto fail;
}
fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT);
pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV);
close(pfd[1]);
if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) {
(void)kill(pid_backup, SIGKILL);
(void)waitpid(pid_backup, NULL, 0);
goto fail;
}
if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) {
goto fail;
}
char fd_path[32];
sprintf(fd_path, "/proc/self/fd/%d", out_fd);
if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link base backup: %m\n");
goto fail;
}
close(out_fd);
if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
abort();
}
close(subdfd);
return 0;
fail:
(void)unlinkat(backup_dfd, "next", 0);
close(out_fd);
fail_clean:
(void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR);
fail_close:
close(subdfd);
fail_rmdir:
(void)unlinkat(backup_dfd, name, AT_REMOVEDIR);
return -1;
}
static bool should_pgbasebackup(int subdfd) {
struct stat st;
if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) {
fprintf(stderr, "Failed to stat %s: %m; creating new snapshot\n", BASE_OUT);
return true;
}
if(!S_ISREG(st.st_mode)) {
fprintf(stderr, "Expected regular file at %s; creating new snapshot\n", BASE_OUT);
return true;
}
off_t base_size = st.st_size;
off_t wal_size = 0;
off_t wal_count = 0;
int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(wal_dfd < 0) {
fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m; creating new snapshot\n");
return true;
}
DIR *wal_dir = fdopendir(wal_dfd);
if(!wal_dir) {
fprintf(stderr, "fdopendir(): %m\n");
abort();
}
while(true) {
errno = 0;
struct dirent *ent = readdir(wal_dir);
if(!ent) {
if(errno == 0) break;
fprintf(stderr, "readdir(): %m\n");
abort();
}
if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue;
wal_count += 1;
if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue;
wal_size += st.st_size;
}
closedir(wal_dir);
if(wal_count > 8 && (wal_size >> 1) >= base_size) {
fprintf(stderr, "WAL directroy too big; creating new snapshot\n");
return true;
}
return false;
}
static int bak_open_current(char *name, size_t namebuf) {
ssize_t r = readlinkat(backup_dfd, "current", name, namebuf);
if(r < 0) {
if(errno != ENOENT) {
fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
return -1;
}
fprintf(stderr, "Failed to open() $PGBAK/current: %m\n");
return -1;
}
if((size_t)r >= namebuf) {
fprintf(stderr, "Symlink $PGBAK/current target too long\n");
return -1;
}
name[r] = 0;
int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(subdfd < 0) {
fprintf(stderr, "Failed to open() $PGBAK/%s: %m\n", name);
}
return subdfd;
}
// Archiver. Called in a subprocess with LOCK_STATE.
static void bak_work(int state) {
unsigned int backoff = 4 * 512;
char cur_name[32];
char ts_name[32];
const char *archive_cmd[] = {
"../scripts/backup", cur_name, ts_name, NULL, NULL,
};
lck_wait(LOCK_WAIT);
bool did_snapshot = false;
bool refresh_ts = true;
int flags = state;
state &= STATE_MASK;
// Skip reopening if a full snapshot was requested.
while(true) {
switch(state) {
case STATE_IDLE:
if(flags & S_WANT_ARCHIVE) state = STATE_ARCHIVE;
[[fallthrough]];
case STATE_ARCHIVE:
if(flags & S_WANT_SNAPSHOT) state = STATE_SNAPSHOT;
[[fallthrough]];
case STATE_PROMOTE:
case STATE_PREARCHIVE:
if(flags & S_WANT_NEW_SNAPSHOT) state = STATE_SNAPSHOT;
break;
default:
state = STATE_SNAPSHOT;
}
lck_write(state);
if(state == STATE_IDLE) break;
lck_release(LOCK_STATE);
if(state == STATE_SNAPSHOT || refresh_ts) {
refresh_ts = false;
struct timespec ts;
if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
abort();
}
sprintf(ts_name, "%lld", (long long)ts.tv_sec);
}
const char *failed = NULL;
switch(state) {
case STATE_SNAPSHOT:
if(bak_snapshot(ts_name) < 0) {
failed = "pg_basebackup";
break;
}
fprintf(stderr, "Finished snapshot %s\n", ts_name);
did_snapshot = true;
state = STATE_PREARCHIVE;
break;
case STATE_PROMOTE:
if(renameat(backup_dfd, "next", backup_dfd, "current") < 0) {
fprintf(stderr, "Failed to promote $PGBAK/next to current: %m\n");
state = STATE_SNAPSHOT;
break;
}
if(fsync(backup_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
abort();
}
state = STATE_ARCHIVE;
break;
case STATE_ARCHIVE:
case STATE_PREARCHIVE:
{
int subdfd = bak_open_current(cur_name, sizeof(cur_name));
if(subdfd < 0) {
if(state == STATE_ARCHIVE) {
state = STATE_SNAPSHOT;
} else {
state = STATE_PROMOTE;
}
break;
}
archive_cmd[3] = (state == STATE_PREARCHIVE) ? "old" : NULL;
pid_t child = cld_spawn(-1, -1, subdfd, archive_cmd);
if(cld_wait(child, archive_cmd[0])) {
close(subdfd);
failed = "backup script";
break;
}
fprintf(stderr, "Finished archive %s-%s\n", cur_name, ts_name);
if(state == STATE_PREARCHIVE) {
state = STATE_PROMOTE;
} else if(!did_snapshot && should_pgbasebackup(subdfd)) {
state = STATE_SNAPSHOT;
} else {
state = STATE_IDLE;
}
close(subdfd);
break;
}
}
if(failed) {
backoff += backoff >> 2;
if(backoff > 153600) backoff = 153600;
unsigned rnd;
(void)getentropy(&rnd, sizeof(rnd));
backoff += rnd & 1023;
struct timespec tv = {
.tv_sec = backoff / 512,
.tv_nsec = backoff % 512 * 1953125,
};
fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec);
(void)nanosleep(&tv, NULL);
} else {
backoff = 4 * 512;
}
lck_wait(LOCK_STATE);
flags = lck_read();
}
}
// Maybe start an archiver. If an archiver is already running, return -1.
// Otherwise return state flags (0 if nothing needs to be done).
static int bak_begin(int flags) {
lck_wait(LOCK_STATE);
int state = lck_read();
state |= flags;
if(!lck_try(LOCK_BACKUP)) {
if(flags & ~state) {
lck_write(state);
}
lck_release(LOCK_STATE);
return -1;
}
// We got the backup lock. If anything was running, it must have crashed.
// Mark it as pending.
switch(state & STATE_MASK) {
case STATE_SNAPSHOT:
fprintf(stderr, "Resuming interrupted snapshot\n");
break;
case STATE_PROMOTE:
case STATE_ARCHIVE:
fprintf(stderr, "Resuming interrupted backup\n");
break;
}
return state;
}
static void on_sigalrm(int) {
_Exit(1);
}
static int cmd_wait(long timeout) {
if(timeout == 0) {
if(lck_try(LOCK_WAIT)) return 0;
return 1;
}
if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) {
signal(SIGALRM, on_sigalrm);
alarm((unsigned)timeout);
}
lck_wait(LOCK_WAIT);
return 0;
}
static int cmd_wal(const char *name) {
size_t name_len = strlen(name);
if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) {
fprintf(stderr, "Invalid WAL file name '%s'\n", name);
return 1;
}
int flags = S_WANT_ARCHIVE;
// current is renamed to next, so by opening next first, we ensure that they
// are distinct.
int next_dfd = openat(backup_dfd, "next/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(next_dfd < 0 && errno != ENOENT) {
fprintf(stderr, "Failed to open $PGBAK/next/pg_wal: %m; will force new snapshot\n");
flags |= S_WANT_NEW_SNAPSHOT;
}
int cur_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(cur_dfd < 0) {
if(errno != ENOENT) {
fprintf(stderr, "Failed to open directory $PGBAK/current/pg_wal: %m\n");
return 1;
}
fprintf(stderr, "Directory $PGBAK/current/pg_wal does not exist; will request new snapshot\n");
if(next_dfd < 0) {
flags = S_WANT_SNAPSHOT;
}
}
if(cur_dfd < 0 && next_dfd < 0) {
fprintf(stderr, "Skipping %s\n", name);
goto skip;
}
char name_out[256];
memcpy(name_out, name + 7, name_len - 7);
memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1);
if(
(cur_dfd >= 0 && faccessat(cur_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) ||
(next_dfd >= 0 && faccessat(next_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0)
) {
// Assume we've gone back in WAL history
fprintf(stderr, "Backup of %s already exists; forcing snapshot\n", name);
if(cur_dfd >= 0) close(cur_dfd);
if(next_dfd >= 0) close(next_dfd);
flags = S_WANT_NEW_SNAPSHOT;
goto skip;
}
int out_fd = openat(backup_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
if(out_fd < 0) {
fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n");
goto fail_dir;
}
int in_fd = open(name, O_RDONLY | O_CLOEXEC);
if(in_fd < 0) {
fprintf(stderr, "Failed to open WAL file %s: %m\n", name);
goto fail;
}
fprintf(stderr, "Archiving %s\n", name);
if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) {
fprintf(stderr, "Failed to create WAL backup\n");
goto fail;
}
char fd_path[32];
sprintf(fd_path, "/proc/self/fd/%d", out_fd);
if(cur_dfd >= 0 && linkat(AT_FDCWD, fd_path, cur_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link WAL backup: %m\n");
goto fail;
}
if(next_dfd >= 0 && linkat(AT_FDCWD, fd_path, next_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link WAL backup: %m\n");
goto fail;
}
close(out_fd);
if(cur_dfd >= 0 && fsync(cur_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
goto fail_dir;
}
if(next_dfd >= 0 && fsync(next_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
goto fail_dir;
}
close(cur_dfd);
skip:
flags = bak_begin(flags);
if(flags > 0) {
pid_t r = fork();
if(r < 0) {
fprintf(stderr, "fork(): %m\n");
abort();
}
if(r == 0) {
bak_work(flags);
_Exit(0);
}
}
return 0;
fail:
close(out_fd);
fail_dir:
if(cur_dfd >= 0) close(cur_dfd);
if(next_dfd >= 0) close(next_dfd);
return 1;
}
static int cmd_sync(bool force_archive, bool force_snapshot) {
int flags = 0;
if(force_archive) flags |= S_WANT_ARCHIVE;
if(force_snapshot) flags |= S_WANT_NEW_SNAPSHOT;
flags = bak_begin(flags);
if(flags < 0) {
return cmd_wait(-1);
}
bak_work(flags);
return 0;
}
int main(int argc, char **argv) {
if(argc == 1) goto usage;
const char *backup_dir = getenv("PGBAK");
if(!backup_dir || backup_dir[0] != '/') {
fprintf(stderr, "$PGBAK not set to an absolute path\n");
return 1;
}
backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(backup_dfd < 0) {
fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir);
return 1;
}
lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600);
if(lck_fd < 0) {
fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n");
_Exit(1);
}
const char *op = argv[1];
if(!strcmp(op, "wal")) {
if(argc != 3) goto usage;
return cmd_wal(argv[2]);
}
if(!strcmp(op, "sync")) {
if(argc != 2) goto usage;
return cmd_sync(false, false);
}
if(!strcmp(op, "force-sync")) {
if(argc != 2) goto usage;
return cmd_sync(true, false);
}
if(!strcmp(op, "full-sync")) {
if(argc != 2) goto usage;
return cmd_sync(true, true);
}
if(!strcmp(op, "wait")) {
if(argc != 2 && argc != 3) goto usage;
long timeout = -1;
if(argc > 2) {
char *ep = NULL;
timeout = strtol(argv[2], &ep, 10);
if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage;
}
return cmd_wait(timeout);
}
usage:
char *arg0 = argv[0];
fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0);
return 1;
}