aboutsummaryrefslogtreecommitdiff
path: root/pgbak.c
diff options
context:
space:
mode:
authorHristo Venev <hristo@venev.name>2023-10-17 22:44:23 +0300
committerHristo Venev <hristo@venev.name>2023-10-17 22:44:23 +0300
commit42d544a9c03aa4b682189f2274c5c1bea346d635 (patch)
tree3496b2c89573e87ea4a3ac97b16028ee7560bc33 /pgbak.c
parent8c89da4825322660a912b717ba83326151e0866e (diff)
Remember and retry running operation
Diffstat (limited to 'pgbak.c')
-rw-r--r--pgbak.c317
1 files changed, 188 insertions, 129 deletions
diff --git a/pgbak.c b/pgbak.c
index 439d643..8fa265f 100644
--- a/pgbak.c
+++ b/pgbak.c
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: LGPL-3.0-or-later
//
-// Copyright 2021 Hristo Venev
+// Copyright 2021-2023 Hristo Venev
#define _GNU_SOURCE
#include <dirent.h>
@@ -24,20 +24,39 @@ static int backup_dfd;
static int lck_fd;
-static const char *const LOCK_KIND[] = {
- // The `state` lock must be held while doing state-related updates, for
- // example modifying the "archive needed" flag.
- "state",
+// ===== Lock indices =====
+
+// LOCK_STATE must be held while doing state-related updates, for example
+// modifying the "archive needed" flag.
+#define LOCK_STATE 0
+
+// LOCK_BACKUP is held by the archiver process. It is try-acquired with
+// the state lock held, and then the state lock is released.
+#define LOCK_BACKUP 1
- // The `backup` lock is held by the archiver process.
- "backup",
+// LOCK_WAIT is acquired by the archiver process after successfully acquiring
+// the backup`
+// lock. It is also acquired by processes that wait for the archiver to
+// finish.
+#define LOCK_WAIT 2
- // The `wait` lock is acquired by the archiver process after the `backup`
- // lock. It is also acquired by processes that wait for the archiver to
- // finish.
- "wait",
+static const char *const LOCK_KIND[] = {
+ [LOCK_STATE] = "state",
+ [LOCK_BACKUP] = "backup",
+ [LOCK_WAIT] = "wait",
};
+
+// ===== State flags =====
+#define STATE_SHIFT 4 // low=pending, high=running
+#define STATE_MASK 0x0f
+
+// We need to run the backup script.
+#define STATE_ARCHIVE 0x01
+// A new full snapshot needs to be made.
+#define STATE_SNAPSHOT 0x02
+
+
static void lck_release(int pos) {
struct flock l = {
.l_type = F_UNLCK,
@@ -77,26 +96,33 @@ static bool lck_try(int pos) {
};
while(true) {
if(fcntl(lck_fd, F_OFD_SETLK, &l) >= 0) return true;
- if(errno == EINTR) continue;
- if(errno == EAGAIN || errno == EACCES) return false;
- fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
- abort();
+ switch(errno) {
+ case EAGAIN:
+ case EACCES:
+ return false;
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "Failed to acquire %s lock: %m\n", LOCK_KIND[pos]);
+ abort();
+ }
}
}
-static int lck_read(int pos) {
+static int lck_read(void) {
unsigned char b;
- ssize_t r = pread(lck_fd, &b, 1, pos);
+ ssize_t r = pread(lck_fd, &b, 1, 0);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
}
- if(r >= 1) return b;
- return -1;
+ if(r == 0) return 0;
+ return b;
}
-static void lck_write(int pos, unsigned char b) {
- ssize_t r = pwrite(lck_fd, &b, 1, pos);
+static void lck_write(int v) {
+ unsigned char b = (unsigned char)v;
+ ssize_t r = pwrite(lck_fd, &b, 1, 0);
if(r < 0) {
fprintf(stderr, "Failed to read lock file: %m\n");
abort();
@@ -114,11 +140,21 @@ static pid_t cld_spawn(int in_fd, int out_fd, int chdir_fd, const char *const *a
abort();
}
if(child == 0) {
- if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) _exit(1);
- if(in_fd >= 0 && dup2(in_fd, 0) < 0) _exit(1);
- if(out_fd >= 0 && dup2(out_fd, 1) < 0) _exit(1);
+ if(chdir_fd >= 0 && fchdir(chdir_fd) < 0) {
+ perror("fchdir");
+ _Exit(1);
+ }
+ if(in_fd >= 0 && dup2(in_fd, 0) < 0) {
+ perror("dup2");
+ _Exit(1);
+ }
+ if(out_fd >= 0 && dup2(out_fd, 1) < 0) {
+ perror("dup2");
+ _Exit(1);
+ }
execvpe(argv[0], (char*const*)argv, environ);
- _exit(1);
+ perror("execve");
+ _Exit(1);
}
return child;
}
@@ -290,31 +326,8 @@ static bool should_pgbasebackup(int subdfd) {
return wal_count > 8 && (wal_size >> 1) >= base_size;
}
-// Should we become an archiver? If so, acquire `backup` and `wait` locks.
-static bool bak_begin(bool force) {
- bool r = false;
- lck_wait(0);
-
- if(!lck_try(1)) {
- if(force) lck_write(0, 2);
- goto end;
- }
-
- if(!force && lck_read(0) <= 0) {
- lck_release(1);
- goto end;
- }
-
- lck_wait(2);
- r = true;
-
-end:
- lck_release(0);
- return r;
-}
-
-// Archiver
-static void bak_work(void) {
+// Archiver. Called in a subprocess with LOCK_STATE.
+static void bak_work(int state) {
unsigned int backoff = 4 * 512;
char name[32];
char ts_s[32];
@@ -322,76 +335,79 @@ static void bak_work(void) {
"../scripts/backup", name, ts_s, NULL,
};
+ lck_wait(LOCK_WAIT);
+
+ bool did_snapshot = false;
int subdfd = -1;
+ bool refresh_ts = true;
- {
- ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name));
- if(r < 0) {
- if(errno != ENOENT) {
- fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
- exit(1);
+ // Skip reopening if a full snapshot was requested.
+ while(true) {
+ if((state & STATE_ARCHIVE) && subdfd < 0) {
+ ssize_t r = readlinkat(backup_dfd, "current", name, sizeof(name));
+ if(r < 0) {
+ if(errno != ENOENT) {
+ fprintf(stderr, "Failed to readlink() $PGBAK/current: %m\n");
+ _Exit(1);
+ }
+ state = STATE_SNAPSHOT;
+ } else {
+ if((size_t)r >= sizeof(name)) {
+ fprintf(stderr, "Symlink $PGBAK/current target too long\n");
+ _Exit(1);
+ }
+ name[r] = 0;
+ subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(subdfd < 0) {
+ fprintf(stderr, "Failed to open() $PGBAK/%s: %m\n", name);
+ state = STATE_SNAPSHOT;
+ }
}
- } else if((size_t)r >= sizeof(name)) {
- fprintf(stderr, "Symlink $PGBAK/current target too long\n");
- exit(1);
- } else {
- name[r] = 0;
- subdfd = openat(backup_dfd, name, O_RDONLY | O_DIRECTORY | O_CLOEXEC);
}
- }
-
- bool want_ts = true;
- while(true) {
- lck_write(0, 1);
- bool want_snapshot = false;
- const char *failed = NULL;
- if(subdfd >= 0) {
- if(want_ts) {
- struct timespec ts;
- if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
- fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
- abort();
- }
- sprintf(ts_s, "%lld", (long long)ts.tv_sec);
- want_ts = false;
+ if((state & STATE_ARCHIVE) && refresh_ts) {
+ refresh_ts = false;
+ struct timespec ts;
+ if(clock_gettime(CLOCK_REALTIME, &ts) < 0) {
+ fprintf(stderr, "clock_gettime(CLOCK_REALTIME): %m\n");
+ abort();
}
+ sprintf(ts_s, "%lld", (long long)ts.tv_sec);
+ }
+
+ lck_write(state << STATE_SHIFT);
+ lck_release(LOCK_STATE);
+ const char *failed = NULL;
+ if(state & STATE_ARCHIVE) {
pid_t child = cld_spawn(-1, -1, subdfd, cmd);
if(cld_wait(child, cmd[0])) {
failed = "backup script";
} else {
- fprintf(stderr, "Finished backup %s-%s\n", name, ts_s);
+ fprintf(stderr, "Finished archive %s-%s\n", name, ts_s);
}
-
- if(should_pgbasebackup(subdfd)) {
+ if(!(state & STATE_SNAPSHOT) && !did_snapshot && should_pgbasebackup(subdfd)) {
+ state |= STATE_SNAPSHOT;
+ }
+ if(!failed) {
+ state &= ~STATE_ARCHIVE;
+ }
+ } else if(state & STATE_SNAPSHOT) {
+ if(subdfd >= 0) {
close(subdfd);
- want_snapshot = true;
}
- } else {
- want_snapshot = true;
- }
-
- if(want_snapshot) {
subdfd = bak_snapshot(name);
if(subdfd < 0) {
failed = "pg_basebackup";
+ } else {
+ fprintf(stderr, "Finished snapshot %s\n", name);
+ did_snapshot = true;
+ refresh_ts = true;
+ state = STATE_ARCHIVE;
}
- }
-
- lck_wait(0);
- int r = lck_read(0);
- if(r < 0) {
- fprintf(stderr, "Lock file truncated: %m\n");
- abort();
- }
- if(r > 1) {
- want_ts = true;
- } else if(!failed) {
- lck_write(0, 0);
+ } else {
break;
}
- lck_release(0);
if(failed) {
backoff += backoff >> 2;
@@ -409,23 +425,46 @@ static void bak_work(void) {
} else {
backoff = 4 * 512;
}
+
+ lck_wait(LOCK_STATE);
+ int nstate = lck_read();
+ state |= nstate & STATE_MASK;
+ if(nstate & STATE_ARCHIVE) refresh_ts = true;
}
}
+// Maybe start an archiver. If an archiver is already running, return -1.
+// Otherwise return state flags (0 if nothing needs to be done).
+static int bak_begin(int flags) {
+ lck_wait(LOCK_STATE);
+
+ int state = lck_read();
+ if(!lck_try(LOCK_BACKUP)) {
+ if(flags & ~state) {
+ lck_write(state | flags);
+ }
+ return -1;
+ }
+
+ // We got the backup lock. If anything was running, it must have crashed.
+ // Mark it as pending.
+ return flags | (state & STATE_MASK) | (state >> STATE_SHIFT);
+}
+
static void on_sigalrm(int) {
_Exit(1);
}
static int cmd_wait(long timeout) {
if(timeout == 0) {
- if(lck_try(2)) return 0;
+ if(lck_try(LOCK_WAIT)) return 0;
return 1;
}
if(timeout > 0 && (unsigned long)timeout <= UINT_MAX) {
signal(SIGALRM, on_sigalrm);
alarm((unsigned)timeout);
}
- lck_wait(2);
+ lck_wait(LOCK_WAIT);
return 0;
}
@@ -436,10 +475,14 @@ static int cmd_wal(const char *name) {
return 1;
}
+ int flags = STATE_ARCHIVE;
int wal_dfd = openat(backup_dfd, "current/pg_wal", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
if(wal_dfd < 0) {
- if(errno == ENOENT) goto do_full;
- fprintf(stderr, "Failed to open $PGBAK/current/pg_wal: %m\n");
+ if(errno == ENOENT) {
+ fprintf(stderr, "Directory $PGBAK/current/pg_wal does not exist; skipping %s and doing snapshot\n", name);
+ goto skip;
+ }
+ fprintf(stderr, "Failed to open directory $PGBAK/current/pg_wal: %m\n");
return 1;
}
@@ -447,9 +490,14 @@ static int cmd_wal(const char *name) {
memcpy(name_out, name + 7, name_len - 7);
memcpy(name_out + name_len - 7, WAL_EXT, strlen(WAL_EXT) + 1);
- if(faccessat(wal_dfd, name_out, R_OK, AT_SYMLINK_NOFOLLOW) >= 0) {
- fprintf(stderr, "WAL backup exists at $PGBAK/current/%s, skipping\n", name_out);
- return 0;
+ if(faccessat(wal_dfd, name_out, F_OK, AT_SYMLINK_NOFOLLOW) >= 0) {
+ // Assume we've gone back in time
+ fprintf(stderr, "Backup of %s already exists; forcing snapshot\n", name);
+ close(wal_dfd);
+ // Prevent misdirected WALs or double snapshot
+ (void) unlinkat(backup_dfd, "current", 0);
+ flags = STATE_SNAPSHOT;
+ goto skip;
}
int out_fd = openat(wal_dfd, ".", O_WRONLY | O_TMPFILE | O_CLOEXEC, 0600);
@@ -485,18 +533,20 @@ static int cmd_wal(const char *name) {
}
close(wal_dfd);
-do_full:
- if(!bak_begin(true)) return 0;
-
- pid_t r = fork();
- if(r < 0) {
- fprintf(stderr, "fork(): %m\n");
- abort();
+skip:
+ flags = bak_begin(flags);
+ if(flags > 0) {
+ pid_t r = fork();
+ if(r < 0) {
+ fprintf(stderr, "fork(): %m\n");
+ abort();
+ }
+ if(r == 0) {
+ bak_work(flags);
+ _Exit(0);
+ }
}
- if(r != 0) return 0;
-
- bak_work();
- exit(0);
+ return 0;
fail:
close(out_fd);
@@ -505,6 +555,18 @@ fail_dir:
return 1;
}
+static int cmd_sync(bool force_archive, bool force_snapshot) {
+ int flags = 0;
+ if(force_archive) flags |= STATE_ARCHIVE;
+ if(force_snapshot) flags |= STATE_SNAPSHOT;
+ flags = bak_begin(flags);
+ if(flags < 0) {
+ return cmd_wait(-1);
+ }
+ bak_work(flags);
+ return 0;
+}
+
int main(int argc, char **argv) {
if(argc == 1) goto usage;
@@ -523,7 +585,7 @@ int main(int argc, char **argv) {
lck_fd = openat(backup_dfd, "pgbak.lock", O_RDWR | O_CREAT | O_CLOEXEC, 0600);
if(lck_fd < 0) {
fprintf(stderr, "Failed to open $PGBAK/pgbak.lock: %m\n");
- return 1;
+ _Exit(1);
}
const char *op = argv[1];
@@ -535,20 +597,17 @@ int main(int argc, char **argv) {
if(!strcmp(op, "sync")) {
if(argc != 2) goto usage;
- if(bak_begin(false)) {
- bak_work();
- return 0;
- }
- return cmd_wait(-1);
+ return cmd_sync(false, false);
}
if(!strcmp(op, "force-sync")) {
if(argc != 2) goto usage;
- if(bak_begin(true)) {
- bak_work();
- return 0;
- }
- return cmd_wait(-1);
+ return cmd_sync(true, false);
+ }
+
+ if(!strcmp(op, "full-sync")) {
+ if(argc != 2) goto usage;
+ return cmd_sync(true, true);
}
if(!strcmp(op, "wait")) {