// SPDX-License-Identifier: LGPL-3.0-or-later
#include "impl.h"
const char *hkvs_record_key(hkvs *c, hkvs_table *t, hkvs_rid rid) {
(void)c;
if(rid.id > hkvs_slab_size(&t->slab)) return NULL;
size_t id = (size_t)rid.id;
if(hkvs_slab_mark(&t->slab, id) == 0) return NULL;
return hkvs_slab_at(&t->slab, id);
}
int hkvs_record_value(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t *psize) {
if(rid.id > hkvs_slab_size(&t->slab)) return 0;
size_t id = (size_t)rid.id;
size_t key_size = hkvs_table__key_size(t);
char *key = hkvs_slab_at(&t->slab, id);
return hkvs_record__value(c, t, id, key, key_size, 0, pvalue, psize);
}
int hkvs_record__value(hkvs *c, hkvs_table *t, size_t id, char *key, size_t key_size, int notfound, char **pvalue, size_t *psize) {
uint8_t mark = hkvs_slab_mark(&t->slab, id);
if(mark == 0) return notfound;
char *p = key + key_size;
size_t size;
if(mark <= HKVS_MARK_MAX_SMALL) {
size = mark - 1;
if(!pvalue) goto ret_novalue;
goto ret_value;
}
uint64_t size_64 = hkvs_read_u64(p);
if(size_64 > SIZE_MAX) return -EFBIG;
size = (size_t)size_64;
if(!pvalue) goto ret_novalue;
uint64_t val = hkvs_read_u64(p + 8);
switch(mark) {
case HKVS_MARK_EXT:
{
size_t slot;
int r = hkvs_extfile_open(c, val, &slot, size);
if(r < 0) return r;
hkvs_extfile *f = &c->extfiles[slot];
hkvs_extfile_pin(c, f);
p = f->addr;
break;
}
case HKVS_MARK_SLAB:
{
uint8_t slab = val & 255;
val >>= 8;
if(slab >= HKVS_N_SLABS) return -EBADMSG;
hkvs_slab *s = &c->slabs[slab];
if(size > s->elt_size) return -EBADMSG;
if(val > hkvs_slab_size(s)) return -EBADMSG;
size_t sid = (size_t)val;
p = hkvs_slab_at(s, sid);
break;
}
default:
return -EBADMSG;
}
ret_value:
*pvalue = p;
ret_novalue:
if(psize) *psize = size;
return 1;
}
int hkvs_record_resize(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t new_size, size_t keep_size) {
hkvs_put_records(c);
if(rid.id > hkvs_slab_size(&t->slab)) goto wtf;
size_t id = (size_t)rid.id;
size_t key_size = hkvs_table__key_size(t);
uint8_t inl_size = hkvs_table__inl_size(t);
uint8_t mark = hkvs_slab_mark(&t->slab, id);
if(mark == 0) goto wtf;
char *key = hkvs_slab_at(&t->slab, id);
char *p = key + key_size;
uint8_t new_slab = hkvs_slab_for_size(new_size);
uint64_t old_fi = 0;
size_t old_fi_slot;
hkvs_slab *old_s = NULL;
size_t old_sid;
char *old_value;
size_t old_size;
if(mark < HKVS_MARK_MAX_SMALL) {
if(new_size <= inl_size) {
hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1));
if(pvalue) *pvalue = p;
return 0;
}
old_value = p;
old_size = mark - 1;
} else {
uint64_t val = hkvs_read_u64(p);
if(val > SIZE_MAX) return -EFBIG;
old_size = (size_t)val;
val = hkvs_read_u64(p + 8);
switch(mark) {
case HKVS_MARK_EXT:
{
old_fi = val;
int r = hkvs_extfile_open(c, old_fi, &old_fi_slot, old_size);
if(r < 0) return r;
hkvs_extfile *f = &c->extfiles[old_fi_slot];
old_value = f->addr;
if(new_slab >= HKVS_N_SLABS && keep_size <= HKVS_REWRITE_THRESHOLD) {
r = c->io->resize(c->io, old_fi, &f->addr, f->size, new_size);
if(r < 0) return r;
f->size = new_size;
hkvs_write_u64(p, new_size);
if(pvalue) {
hkvs_extfile_pin(c, f);
*pvalue = f->addr;
}
return 0;
}
break;
}
case HKVS_MARK_SLAB:
{
uint8_t slab = val & 255;
val >>= 8;
if(slab >= HKVS_N_SLABS) return -EBADMSG;
old_s = &c->slabs[slab];
if(old_size > old_s->elt_size) return -EBADMSG;
if(val > hkvs_slab_size(old_s)) return -EBADMSG;
old_sid = (size_t)val;
old_value = hkvs_slab_at(old_s, old_sid);
if(slab == new_slab) {
hkvs_write_u64(p, new_size);
if(pvalue) *pvalue = old_value;
return 0;
}
break;
}
default:
return -EBADMSG;
}
}
if(old_size < keep_size) keep_size = old_size;
if(new_size < keep_size) keep_size = new_size;
char *new_value;
if(new_size <= inl_size) {
hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1));
new_value = p;
} else {
new_slab = hkvs_slab_for_size(new_size);
hkvs_write_u64(p, new_size);
if(new_slab < HKVS_N_SLABS) {
hkvs_slab *s = &c->slabs[new_slab];
size_t sid;
int r = hkvs_slab_alloc(c, s, &sid);
if(r < 0) return r;
hkvs_slab_set_mark(s, sid, 1);
new_value = hkvs_slab_at(s, sid);
hkvs_write_u64(p + 8, ((uint64_t)sid << 8) | new_slab);
hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_SLAB);
} else {
char *addr;
uint64_t new_fi;
int r = hkvs_extfile_create(c, &new_fi, &addr, new_size, !!pvalue);
if(r < 0) return r;
new_value = addr;
hkvs_write_u64(p + 8, new_fi);
hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_EXT);
}
}
memcpy(new_value, old_value, keep_size);
if(pvalue) *pvalue = new_value;
if(old_fi) {
hkvs_extfile_close(c, old_fi_slot);
c->io->unlink(c->io, old_fi);
}
if(old_s) {
hkvs_slab_discard(old_s, old_sid);
}
return 0;
wtf:
hkvs_die("record does not exist");
}
void hkvs_record_delete(hkvs *c, hkvs_table *t, hkvs_rid rid) {
if(rid.id > hkvs_slab_size(&t->slab)) goto wtf;
size_t id = (size_t)rid.id;
size_t key_size = hkvs_table__key_size(t);
uint8_t mark = hkvs_slab_mark(&t->slab, id);
if(mark == 0) goto wtf;
char *key = hkvs_slab_at(&t->slab, id);
char *p = key + key_size;
uint64_t unlink_fi = 0;
hkvs_slab *s = NULL;
size_t sid;
switch(mark) {
case HKVS_MARK_SLAB:
{
uint64_t val = hkvs_read_u64(p + 8);
uint8_t slab = val & 255;
val >>= 8;
if(slab >= HKVS_N_SLABS) goto corrupted;
s = &c->slabs[slab];
if(val >= hkvs_slab_size(s)) goto corrupted;
sid = (size_t)val;
break;
}
case HKVS_MARK_EXT:
{
uint64_t fi = hkvs_read_u64(p + 8);
if(fi == 0) goto corrupted;
size_t slot;
if(hkvs_extfile_find(c, fi, &slot)) {
hkvs_extfile_close(c, slot);
}
unlink_fi = fi;
break;
}
default:
break;
}
hkvs_ix_remove(t, id);
hkvs_slab_discard(&t->slab, id);
if(unlink_fi) {
c->io->unlink(c->io, unlink_fi);
}
if(s) {
hkvs_slab_discard(s, sid);
}
return;
corrupted:
hkvs_die("database corrupted");
wtf:
hkvs_die("record does not exist");
}
int hkvs_record_append(hkvs *c, hkvs_table *t, hkvs_rid *prid, const char *key, char **pvalue, size_t size) {
size_t key_size = hkvs_table__key_size(t);
uint8_t inl_size = hkvs_table__inl_size(t);
size_t id;
int r = hkvs_slab_alloc(c, &t->slab, &id);
if(r < 0) return r;
char *r_key = hkvs_slab_at(&t->slab, id);
memcpy(r_key, key, key_size);
char *p = r_key + key_size;
char *mark = hkvs_slab_mark_at(&t->slab, id);
hkvs_slab *s = NULL;
size_t sid;
char *value;
uint64_t ext_fi = 0;
if(size <= inl_size) {
hkvs_write_u8(mark, (uint8_t)(size + 1));
value = p;
} else {
uint8_t slab = hkvs_slab_for_size(size);
hkvs_write_u64(p, size);
if(slab < HKVS_N_SLABS) {
s = &c->slabs[slab];
r = hkvs_slab_alloc(c, s, &sid);
if(r < 0) return r;
value = hkvs_slab_at(s, sid);
hkvs_slab_set_mark(s, sid, 1);
hkvs_write_u64(p + 8, (uint64_t)sid << 8 | slab);
hkvs_write_u8(mark, HKVS_MARK_SLAB);
} else {
char *addr;
r = hkvs_extfile_create(c, &ext_fi, &addr, size, !!pvalue);
value = addr;
hkvs_write_u64(p + 8, ext_fi);
hkvs_write_u8(mark, HKVS_MARK_EXT);
}
}
r = hkvs_ix_add(c, t, id);
if(r >= 0) {
if(prid) *prid = (hkvs_rid){id};
if(pvalue) *pvalue = value;
return 1;
}
hkvs_write_u8(mark, 0);
if(s) {
hkvs_slab_discard(s, sid);
}
if(ext_fi) {
size_t slot = --c->n_extfiles;
hkvs_io *io = c->io;
io->close(io, ext_fi, c->extfiles[slot].addr, c->extfiles[slot].size);
io->unlink(io, ext_fi);
}
return r;
}