aboutsummaryrefslogblamecommitdiff
path: root/lib/ix.c
blob: 0a6af19801c2b92e20b78f978ca1152335c8255f (plain) (tree)






















































































































































































































                                                                                                                  
// SPDX-License-Identifier: LGPL-3.0-or-later

#include "impl.h"

static bool hkvs_ix_add_fast(char *ix, size_t id, const char *key, size_t key_size, uint8_t bits, const char *hk);

static int hkvs_ix_build_try(hkvs_table *t, hkvs_io *io, uint8_t ix_bits) {
	if(ix_bits > 59) return -EFBIG;
	uint64_t ix_size_64 = HKVS_IX_SIZE(ix_bits);
	if(ix_size_64 > SIZE_MAX) return -EFBIG;
	size_t ix_size = (size_t)ix_size_64;

	char ix_key[HKVS_IX_KEY_SIZE];

	int r = io->random(io, ix_key, HKVS_IX_KEY_SIZE);
	if(r < 0) return r;

	char *ix;
	uint64_t fi;
	r = io->create(io, &fi, &ix, ix_size);
	if(r < 0) return r;

	size_t key_size = hkvs_read_u8(t->slab.param + HKVS_PARAM_TABLE_KEYSZ);
	size_t n = hkvs_slab_size(&t->slab);
	for(size_t i = 0; i < n; i++) {
		if(hkvs_slab_mark(&t->slab, i) == 0) continue;
		const char *key = hkvs_slab_at(&t->slab, i);
		if(!hkvs_ix_add_fast(ix, i, key, key_size, ix_bits, ix_key)) {
			io->close(io, fi, ix, ix_size);
			io->unlink(io, fi);
			return 0;
		}
	}

	uint64_t old_fi = hkvs_read_u64(t->slab.param + HKVS_PARAM_TABLE_IXFI);
	if(old_fi) {
		if(t->ix) {
			io->close(io, old_fi, t->ix, t->ix_size);
		}
		io->unlink(io, old_fi);
	}

	t->ix = ix;
	t->ix_size = ix_size;

	hkvs_write_u8(t->slab.param + HKVS_PARAM_TABLE_IXBIT, ix_bits);
	hkvs_write_u64(t->slab.param + HKVS_PARAM_TABLE_IXFI, fi);
	memcpy(t->slab.param + HKVS_PARAM_TABLE_IXKEY, ix_key, HKVS_IX_KEY_SIZE);

	return 1;
}

int hkvs_ix_build(hkvs_table *t, hkvs_io *io) {
	size_t count = 0;
	{
		size_t n = hkvs_slab_size(&t->slab);
		for(size_t i = 0; i < n; i++) {
			if(hkvs_slab_mark(&t->slab, i)) count++;
		}
	}

	uint8_t bits = 4;
	if(count > 0) {
		size_t load = count - 1;
		while(load >= 13) {
			load /= 2;
			bits++;
		}
	}

	size_t attempts = 0;
	while(true) {
		int r = hkvs_ix_build_try(t, io, bits);
		if(r < 0) return r;
		if(r) return 0;

		attempts++;
		if(attempts == 8 || attempts == 64) {
			bits++;
		}
	}
}

int hkvs_ix_add(hkvs *c, hkvs_table *t, size_t id) {
	const char *key = hkvs_slab_at(&t->slab, id);

	char *ix = t->ix;
	uint8_t bits = hkvs_table__ix_bits(t);
	size_t key_size = hkvs_table__key_size(t);
	const char *hk = hkvs_table__ix_key(t);

	if(hkvs_ix_add_fast(ix, id, key, key_size, bits, hk)) return 0;

	return hkvs_ix_build(t, c->io);
}

hkvs_rid hkvs_record_find(hkvs *c, hkvs_table *t, const char *key) {
	(void)c;

	const char *hk = hkvs_table__ix_key(t);
	size_t key_size = hkvs_table__key_size(t);
	uint64_t hv = hkvs_hash(hk, key, key_size);

	uint8_t bits = hkvs_table__ix_bits(t);
	size_t entries = hkvs_slab_size(&t->slab);
	char *ix = t->ix;

	size_t size = (size_t)1 << bits;
	size_t mask = size - 1;

	for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
		size_t at = ((size_t)hv + disp) & mask;
		char *ent = ix + at * HKVS_IX_ENTRY;
		uint64_t v = hkvs_read_u64(ent);
		if(v == 0) return HKVS_INVALID_RID;
		if(v > entries) goto fail;
		uint64_t h = hkvs_read_u64(ent + 8);
		if(h != hv) {
			if(((at - h) & mask) < disp) break;
			continue;
		}

		size_t id = (size_t)(v - 1);
		char *found = hkvs_slab_at(&t->slab, id);
		if(!memcmp(key, found, key_size)) {
			return (hkvs_rid){id};
		}
	}

fail:
	return HKVS_INVALID_RID;
}

void hkvs_ix_remove(hkvs_table *t, size_t id) {
	const char *key = hkvs_slab_at(&t->slab, id);

	uint8_t bits = hkvs_table__ix_bits(t);
	const char *hk = hkvs_table__ix_key(t);
	size_t key_size = hkvs_table__key_size(t);
	char *ix = t->ix;

	uint64_t hv = hkvs_hash(hk, key, key_size);
	size_t size = (size_t)1 << bits;
	size_t mask = size - 1;

	uint64_t vv = (uint64_t)id + 1;

	char *ent;
	for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
		size_t at = ((size_t)hv + disp) & mask;
		ent = ix + at * HKVS_IX_ENTRY;
		uint64_t v = hkvs_read_u64(ent);
		if(v == vv) {
			if(hkvs_read_u64(ent + 8) != hv) goto corrupted;
			while(true) {
				size_t next = (at + 1) & mask;
				char *e2 = ix + next * HKVS_IX_ENTRY;
				v = hkvs_read_u64(e2);
				if(v == 0) break;
				uint64_t h = hkvs_read_u64(e2 + 8);
				if((h & mask) == next) break;
				hkvs_write_u64(ent, v);
				hkvs_write_u64(ent + 8, h);
				at = next;
				ent = e2;
			}
			hkvs_write_u64(ent, 0);
			hkvs_write_u64(ent + 8, 0);
			return;
		}
		if(v == 0) goto corrupted;
	}

corrupted:
	hkvs_die("database corrupted");
}

bool hkvs_ix_add_fast(char *ix, size_t id, const char *key, size_t key_size, uint8_t bits, const char *hk) {
	uint64_t h_new = hkvs_hash(hk, key, key_size);
	size_t size = (size_t)1 << bits;
	size_t mask = size - 1;

	uint64_t v_new = (uint64_t)id + 1;

	for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
		size_t at = ((size_t)h_new + disp) & mask;
		char *ent = ix + at * HKVS_IX_ENTRY;
		uint64_t v_disp = hkvs_read_u64(ent);
		if(v_disp == 0) {
			hkvs_write_u64(ent, v_new);
			hkvs_write_u64(ent + 8, h_new);
			return true;
		}
		uint64_t h_disp = hkvs_read_u64(ent + 8);
		if(((at - h_disp) & mask) < disp) {
			hkvs_write_u64(ent, v_new);
			hkvs_write_u64(ent + 8, h_new);
			while(true) {
				at = (at + 1) & mask;
				ent = ix + at * HKVS_IX_ENTRY;
				uint64_t v2 = hkvs_read_u64(ent);
				uint64_t h2 = hkvs_read_u64(ent + 8);
				hkvs_write_u64(ent, v_disp);
				hkvs_write_u64(ent + 8, h_disp);
				if(v2 == 0) return true;
				v_disp = v2;
				h_disp = h2;
				if(((at - h_disp) & mask) >= HKVS_IX_MAX_DISP - 1) return false;
			}
			return true;
		}
	}

	return false;
}