// SPDX-License-Identifier: LGPL-3.0-or-later
//
// Copyright 2021 Hristo Venev
#define _GNU_SOURCE
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include "config.h"
static int backup_dfd;
static int lck_fd;
static const char *const LOCK_KIND[] = {
// The `state` lock must be held while doing state-related updates, for
// example modifying the "archive needed" flag.
"state",
// The `backup` lock is held by the archiver process.
"backup",
// The `wait` lock is acquired by the archiver process after the `backup`
// lock. It is also acquired by processes that wait for the archiver to
// finish.
"wait",
};
static void lck_release(int pos) {
struct flock l = {
.l_type = F_UNLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
if(errno == EINTR) continue;
fprintf(stderr, "Failed to release %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
static void lck_wait(int pos) {
struct flock l = {
.l_type = F_WRLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLKW, &l) >= 0) return;
if(errno == EINTR) continue;
fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
static bool lck_try(int pos) {
struct flock l = {
.l_type = F_WRLCK,
.l_whence = SEEK_SET,
.l_start = pos,
.l_len = 1,
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true;
if(errno == EINTR) continue;
if(errno == EAGAIN || errno == EACCES) return false;
fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
abort();
}
}
static int lck_read(int pos) {
unsigned char b;
ssize_t r = pread(lck_fd, &b, 1, pos);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
}
if(r >= 1) return b;
return -1;
}
static void lck_write(int pos, unsigned char b) {
ssize_t r = pwrite(lck_fd, &b, 1, pos);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
}
if(r == 0) {
fprintf(stderr, "Failed to write lock file: Empty write\n");
abort();
}
}
static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *argv) {
pid_t child = fork();
if(child < 0) {
fprintf(stderr, "fork(): %m\n");
abort();
}
if(child == 0) {
if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) _exit(1);
if(in_fd >= 0 && dup2(in_fd, 0) < 0) _exit(1);
if(out_fd >= 0 && dup2(out_fd, 1) < 0) _exit(1);
execvpe(argv[0], (char*const*)argv, environ);
_exit(1);
}
return child;
}
static int cld_wait(pid_t child, const char *cld) {
int status;
if(waitpid(child, &status, 0) < 0) {
fprintf(stderr, "waitpid(): %m\n");
abort();
}
if(WIFEXITED(status)) {
int r = WEXITSTATUS(status);
if(r) {
fprintf(stderr, "Child process '%s' exited with status %d\n", cld, r);
}
return r;
} else if(WIFSIGNALED(status)) {
int r = WTERMSIG(status);
fprintf(stderr, "Child process '%s' killed by signal %d\n", cld, r);
return 128 + r;
} else {
fprintf(stderr, "waitpid(): Unknown status\n");
abort();
}
}
static int cld_pipe(int in_fd, int out_fd, const char *const *argv) {
pid_t child = cld_spawn(in_fd, out_fd, -1, argv);
if(in_fd >= 0) close(in_fd);
int r = cld_wait(child, argv[0]);
if(r == 0 && out_fd >= 0 && fsync(out_fd) < 0) {
fprintf(stderr, "fsync(): %m\n");
r = -1;
}
return r;
}
static int bak_snapshot(char *name) {
{
struct timespec ts;
if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
abort();
}
sprintf(name, "%lld", (long long)ts.tv_sec);
}
if(mkdirat(backup_dfd, name, 0700) < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s: %m\n", name);
return -1;
}
int subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(subdfd < 0) {
fprintf(stderr, "Failed to open $PGBAK/%s: %m\n", name);
goto fail_rmdir;
}
if(mkdirat(subdfd, "pg_wal", 0700) < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s/pg_wal: %m\n", name);
goto fail_close;
}
int out_fd = -1;
out_fd = openat(subdfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
if(out_fd < 0) {
fprintf(stderr, "Failed to create $PGBAK/%s/%s: %m\n", name, BASE_OUT);
goto fail_clean;
}
(void)unlinkat(backup_dfd, "current", 0);
if(symlinkat(name, backup_dfd, "current") < 0) {
fprintf(stderr, "Failed to update $PGBAK/current: %m\n");
goto fail;
}
int pfd[2];
if(pipe(pfd) < 0) {
fprintf(stderr, "pipe(): %m\n");
goto fail;
}
fprintf(stderr, "Creating full backup in $PGBAK/%s/%s\n", name, BASE_OUT);
pid_t pid_backup = cld_spawn(-1, pfd[1], -1, BASE_BACKUP_ARGV);
close(pfd[1]);
if(cld_pipe(pfd[0], out_fd, BASE_COMPRESS_ARGV)) {
(void)kill(pid_backup, SIGKILL);
(void)waitpid(pid_backup, NULL, 0);
goto fail;
}
if(cld_wait(pid_backup, BASE_BACKUP_ARGV[0])) {
goto fail;
}
char fd_path[32];
sprintf(fd_path, "/proc/self/fd/%d", out_fd);
if(linkat(AT_FDCWD, fd_path, subdfd, BASE_OUT, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link base backup: %m\n");
goto fail;
}
close(out_fd);
if(fsync(subdfd) < 0 || fsync(backup_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
abort();
}
return subdfd;
fail:
close(out_fd);
fail_clean:
(void)unlinkat(subdfd, "pg_wal", AT_REMOVEDIR);
fail_close:
close(subdfd);
fail_rmdir:
(void)unlinkat(backup_dfd, name, AT_REMOVEDIR);
return -1;
}
static bool should_pgbasebackup(int subdfd) {
struct stat st;
if(fstatat(subdfd, BASE_OUT, &st, AT_SYMLINK_NOFOLLOW) < 0) {
fprintf(stderr, "Failed to stat %s: %m\n", BASE_OUT);
return true;
}
if(!S_ISREG(st.st_mode)) {
fprintf(stderr, "Expected regular file at %s\n", BASE_OUT);
return true;
}
off_t base_size = st.st_size;
off_t wal_size = 0;
off_t wal_count = 0;
int wal_dfd = openat(subdfd, "pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(wal_dfd < 0) {
fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
return true;
}
DIR *wal_dir = fdopendir(wal_dfd);
if(!wal_dir) {
fprintf(stderr, "fdopendir(): %m\n");
abort();
}
while(true) {
errno = 0;
struct dirent *ent = readdir(wal_dir);
if(!ent) {
if(errno == 0) break;
fprintf(stderr, "readdir(): %m\n");
abort();
}
if(!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue;
wal_count += 1;
if(fstatat(wal_dfd, ent->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) continue;
wal_size += st.st_size;
}
closedir(wal_dir);
return wal_count > 8 && (wal_size >> 1) >= base_size;
}
// Should we become an archiver? If so, acquire `backup` and `wait` locks.
static bool bak_begin(bool force) {
bool r = false;
lck_wait(0);
if(!lck_try(1)) {
if(force) lck_write(0, 2);
goto end;
}
if(!force && lck_read(0) <= 0) {
lck_release(1);
goto end;
}
lck_wait(2);
r = true;
end:
lck_release(0);
return r;
}
// Archiver
static void bak_work(void) {
unsigned int backoff = 4 * 512;
char name[32];
char ts_s[32];
const char *const cmd[] = {
"../scripts/backup", name, ts_s, NULL,
};
int subdfd = -1;
{
ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name));
if(r < 0) {
if(errno != ENOENT) {
fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
exit(1);
}
} else if((size_t)r >= sizeof(name)) {
fprintf(stderr, "Symlink $PGBAK/current target too long\n");
exit(1);
} else {
name[r] = 0;
subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
}
}
bool want_ts = true;
while(true) {
lck_write(0, 1);
bool want_snapshot = false;
const char *failed = NULL;
if(subdfd >= 0) {
if(want_ts) {
struct timespec ts;
if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
abort();
}
sprintf(ts_s, "%lld", (long long)ts.tv_sec);
want_ts = false;
}
pid_t child = cld_spawn(-1, -1, subdfd, cmd);
if(cld_wait(child, cmd[0])) {
failed = "backup script";
} else {
fprintf(stderr, "Finished backup %s-%s\n", name, ts_s);
}
if(should_pgbasebackup(subdfd)) {
close(subdfd);
want_snapshot = true;
}
} else {
want_snapshot = true;
}
if(want_snapshot) {
subdfd = bak_snapshot(name);
if(subdfd < 0) {
failed = "pg_basebackup";
}
}
lck_wait(0);
int r = lck_read(0);
if(r < 0) {
fprintf(stderr, "Lock file truncated: %m\n");
abort();
}
if(r > 1) {
want_ts = true;
} else if(!failed) {
lck_write(0, 0);
break;
}
lck_release(0);
if(failed) {
backoff += backoff >> 2;
if(backoff > 153600) backoff = 153600;
unsigned rnd;
(void)getentropy(&rnd, sizeof(rnd));
backoff += rnd & 1023;
struct timespec tv = {
.tv_sec = backoff / 512,
.tv_nsec = backoff % 512 * 1953125,
};
fprintf(stderr, "Error in %s, retrying in %u.%09u seconds\n", failed, (unsigned)tv.tv_sec, (unsigned)tv.tv_nsec);
(void)nanosleep(&tv, NULL);
} else {
backoff = 4 * 512;
}
}
}
static void on_sigalrm(int) {
_Exit(1);
}
static int cmd_wait(long timeout) {
if(timeout == 0) {
if(lck_try(2)) return 0;
return 1;
}
if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) {
signal(SIGALRM, on_sigalrm);
alarm((unsigned)timeout);
}
lck_wait(2);
return 0;
}
static int cmd_wal(const char *name) {
size_t name_len = strlen(name);
if(name_len < 8 || name_len >= 192 || !!memcmp(name, "pg_wal/", 7) || memchr(name + 7, '/', name_len - 7)) {
fprintf(stderr, "Invalid WAL file name '%s'\n", name);
return 1;
}
int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(wal_dfd < 0) {
if(errno == ENOENT) goto do_full;
fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
return 1;
}
char name_out[256];
memcpy(name_out, name + 7, name_len - 7);
memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1);
if(faccessat(wal_dfd, name_out, R_OK, AT_SYMLINK_NOFOLLOW) >= 0) {
fprintf(stderr, "WAL backup exists at $PGBAK/current/%s, skipping\n", name_out);
return 0;
}
int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
if(out_fd < 0) {
fprintf(stderr, "Failed to create WAL backup file at $PGBAK/current: %m\n");
goto fail_dir;
}
int in_fd = open(name, O_RDONLY | O_CLOEXEC);
if(in_fd < 0) {
fprintf(stderr, "Failed to open WAL file %s: %m\n", name);
goto fail;
}
fprintf(stderr, "Archiving %s\n", name);
if(cld_pipe(in_fd, out_fd, WAL_COMPRESS_ARGV)) {
fprintf(stderr, "Failed to create WAL backup\n");
goto fail;
}
char fd_path[32];
sprintf(fd_path, "/proc/self/fd/%d", out_fd);
if(linkat(AT_FDCWD, fd_path, wal_dfd, name_out, AT_SYMLINK_FOLLOW) < 0) {
fprintf(stderr, "Failed to link WAL backup: %m\n");
goto fail;
}
close(out_fd);
if(fsync(wal_dfd) < 0) {
fprintf(stderr, "fsync(): %m\n");
goto fail_dir;
}
close(wal_dfd);
do_full:
if(!bak_begin(true)) return 0;
pid_t r = fork();
if(r < 0) {
fprintf(stderr, "fork(): %m\n");
abort();
}
if(r != 0) return 0;
bak_work();
exit(0);
fail:
close(out_fd);
fail_dir:
close(wal_dfd);
return 1;
}
int main(int argc, char **argv) {
if(argc == 1) goto usage;
const char *backup_dir = getenv("PGBAK");
if(!backup_dir || backup_dir[0] != '/') {
fprintf(stderr, "$PGBAK not set to an absolute path\n");
return 1;
}
backup_dfd = open(backup_dir, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(backup_dfd < 0) {
fprintf(stderr, "Failed to open $PGBAK directory '%s': %m\n", backup_dir);
return 1;
}
lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600);
if(lck_fd < 0) {
fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n");
return 1;
}
const char *op = argv[1];
if(!strcmp(op, "wal")) {
if(argc != 3) goto usage;
return cmd_wal(argv[2]);
}
if(!strcmp(op, "sync")) {
if(argc != 2) goto usage;
if(bak_begin(false)) {
bak_work();
return 0;
}
return cmd_wait(-1);
}
if(!strcmp(op, "force-sync")) {
if(argc != 2) goto usage;
if(bak_begin(true)) {
bak_work();
return 0;
}
return cmd_wait(-1);
}
if(!strcmp(op, "wait")) {
if(argc != 2 && argc != 3) goto usage;
long timeout = -1;
if(argc > 2) {
char *ep = NULL;
timeout = strtol(argv[2], &ep, 10);
if(timeout < 0 || ep == argv[2] || *ep != 0) goto usage;
}
return cmd_wait(timeout);
}
usage:
char *arg0 = argv[0];
fprintf(stderr, "Usage:\n\t%s wal PATH\n\t%s sync\n\t%s force-sync\n\t%s wait [TIMEOUT_SEC]\n", arg0, arg0, arg0, arg0);
return 1;
}