// SPDX-License-Identifier: LGPL-3.0-or-later
#include "impl.h"
static bool hkvs_ix_add_fast(char *ix, size_t id, const char *key, size_t key_size, uint8_t bits, const char *hk);
static int hkvs_ix_build_try(hkvs_table *t, hkvs_io *io, uint8_t ix_bits) {
if(ix_bits > 59) return -EFBIG;
uint64_t ix_size_64 = HKVS_IX_SIZE(ix_bits);
if(ix_size_64 > SIZE_MAX) return -EFBIG;
size_t ix_size = (size_t)ix_size_64;
char ix_key[HKVS_IX_KEY_SIZE];
int r = io->random(io, ix_key, HKVS_IX_KEY_SIZE);
if(r < 0) return r;
char *ix;
uint64_t fi;
r = io->create(io, &fi, &ix, ix_size);
if(r < 0) return r;
size_t key_size = hkvs_read_u8(t->slab.param + HKVS_PARAM_TABLE_KEYSZ);
size_t n = hkvs_slab_size(&t->slab);
for(size_t i = 0; i < n; i++) {
if(hkvs_slab_mark(&t->slab, i) == 0) continue;
const char *key = hkvs_slab_at(&t->slab, i);
if(!hkvs_ix_add_fast(ix, i, key, key_size, ix_bits, ix_key)) {
io->close(io, fi, ix, ix_size);
io->unlink(io, fi);
return 0;
}
}
uint64_t old_fi = hkvs_read_u64(t->slab.param + HKVS_PARAM_TABLE_IXFI);
if(old_fi) {
if(t->ix) {
io->close(io, old_fi, t->ix, t->ix_size);
}
io->unlink(io, old_fi);
}
t->ix = ix;
t->ix_size = ix_size;
hkvs_write_u8(t->slab.param + HKVS_PARAM_TABLE_IXBIT, ix_bits);
hkvs_write_u64(t->slab.param + HKVS_PARAM_TABLE_IXFI, fi);
memcpy(t->slab.param + HKVS_PARAM_TABLE_IXKEY, ix_key, HKVS_IX_KEY_SIZE);
return 1;
}
int hkvs_ix_build(hkvs_table *t, hkvs_io *io) {
size_t count = 0;
{
size_t n = hkvs_slab_size(&t->slab);
for(size_t i = 0; i < n; i++) {
if(hkvs_slab_mark(&t->slab, i)) count++;
}
}
uint8_t bits = 4;
if(count > 0) {
size_t load = count - 1;
while(load >= 13) {
load /= 2;
bits++;
}
}
size_t attempts = 0;
while(true) {
int r = hkvs_ix_build_try(t, io, bits);
if(r < 0) return r;
if(r) return 0;
attempts++;
if(attempts == 8 || attempts == 64) {
bits++;
}
}
}
int hkvs_ix_add(hkvs *c, hkvs_table *t, size_t id) {
const char *key = hkvs_slab_at(&t->slab, id);
char *ix = t->ix;
uint8_t bits = hkvs_table__ix_bits(t);
size_t key_size = hkvs_table__key_size(t);
const char *hk = hkvs_table__ix_key(t);
if(hkvs_ix_add_fast(ix, id, key, key_size, bits, hk)) return 0;
return hkvs_ix_build(t, c->io);
}
hkvs_rid hkvs_record_find(hkvs *c, hkvs_table *t, const char *key) {
(void)c;
const char *hk = hkvs_table__ix_key(t);
size_t key_size = hkvs_table__key_size(t);
uint64_t hv = hkvs_hash(hk, key, key_size);
uint8_t bits = hkvs_table__ix_bits(t);
size_t entries = hkvs_slab_size(&t->slab);
char *ix = t->ix;
size_t size = (size_t)1 << bits;
size_t mask = size - 1;
for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
size_t at = ((size_t)hv + disp) & mask;
char *ent = ix + at * HKVS_IX_ENTRY;
uint64_t v = hkvs_read_u64(ent);
if(v == 0) return HKVS_INVALID_RID;
if(v > entries) goto fail;
uint64_t h = hkvs_read_u64(ent + 8);
if(h != hv) {
if(((at - h) & mask) < disp) break;
continue;
}
size_t id = (size_t)(v - 1);
char *found = hkvs_slab_at(&t->slab, id);
if(!memcmp(key, found, key_size)) {
return (hkvs_rid){id};
}
}
fail:
return HKVS_INVALID_RID;
}
void hkvs_ix_remove(hkvs_table *t, size_t id) {
const char *key = hkvs_slab_at(&t->slab, id);
uint8_t bits = hkvs_table__ix_bits(t);
const char *hk = hkvs_table__ix_key(t);
size_t key_size = hkvs_table__key_size(t);
char *ix = t->ix;
uint64_t hv = hkvs_hash(hk, key, key_size);
size_t size = (size_t)1 << bits;
size_t mask = size - 1;
uint64_t vv = (uint64_t)id + 1;
char *ent;
for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
size_t at = ((size_t)hv + disp) & mask;
ent = ix + at * HKVS_IX_ENTRY;
uint64_t v = hkvs_read_u64(ent);
if(v == vv) {
if(hkvs_read_u64(ent + 8) != hv) goto corrupted;
while(true) {
size_t next = (at + 1) & mask;
char *e2 = ix + next * HKVS_IX_ENTRY;
v = hkvs_read_u64(e2);
if(v == 0) break;
uint64_t h = hkvs_read_u64(e2 + 8);
if((h & mask) == next) break;
hkvs_write_u64(ent, v);
hkvs_write_u64(ent + 8, h);
at = next;
ent = e2;
}
hkvs_write_u64(ent, 0);
hkvs_write_u64(ent + 8, 0);
return;
}
if(v == 0) goto corrupted;
}
corrupted:
hkvs_die("database corrupted");
}
bool hkvs_ix_add_fast(char *ix, size_t id, const char *key, size_t key_size, uint8_t bits, const char *hk) {
uint64_t h_new = hkvs_hash(hk, key, key_size);
size_t size = (size_t)1 << bits;
size_t mask = size - 1;
uint64_t v_new = (uint64_t)id + 1;
for(size_t disp = 0; disp < HKVS_IX_MAX_DISP; disp++) {
size_t at = ((size_t)h_new + disp) & mask;
char *ent = ix + at * HKVS_IX_ENTRY;
uint64_t v_disp = hkvs_read_u64(ent);
if(v_disp == 0) {
hkvs_write_u64(ent, v_new);
hkvs_write_u64(ent + 8, h_new);
return true;
}
uint64_t h_disp = hkvs_read_u64(ent + 8);
if(((at - h_disp) & mask) < disp) {
hkvs_write_u64(ent, v_new);
hkvs_write_u64(ent + 8, h_new);
while(true) {
at = (at + 1) & mask;
ent = ix + at * HKVS_IX_ENTRY;
uint64_t v2 = hkvs_read_u64(ent);
uint64_t h2 = hkvs_read_u64(ent + 8);
hkvs_write_u64(ent, v_disp);
hkvs_write_u64(ent + 8, h_disp);
if(v2 == 0) return true;
v_disp = v2;
h_disp = h2;
if(((at - h_disp) & mask) >= HKVS_IX_MAX_DISP - 1) return false;
}
return true;
}
}
return false;
}