diff options
author | Hristo Venev <hristo@venev.name> | 2020-06-22 12:32:11 +0300 |
---|---|---|
committer | Hristo Venev <hristo@venev.name> | 2020-06-22 12:54:22 +0300 |
commit | cb2a50691fb0cddb64e1b5a9ed242a6a0b42d503 (patch) | |
tree | 57228e4e9efacb0d2a88d7d8b825a44278075875 /lib/record.c |
Diffstat (limited to 'lib/record.c')
-rw-r--r-- | lib/record.c | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/lib/record.c b/lib/record.c new file mode 100644 index 0000000..c9c36d6 --- /dev/null +++ b/lib/record.c @@ -0,0 +1,352 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later + +#include "impl.h" + +const char *hkvs_record_key(hkvs *c, hkvs_table *t, hkvs_rid rid) { + (void)c; + + if(rid.id > hkvs_slab_size(&t->slab)) return NULL; + size_t id = (size_t)rid.id; + if(hkvs_slab_mark(&t->slab, id) == 0) return NULL; + return hkvs_slab_at(&t->slab, id); +} + +int hkvs_record_value(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t *psize) { + if(rid.id > hkvs_slab_size(&t->slab)) return 0; + size_t id = (size_t)rid.id; + size_t key_size = hkvs_table__key_size(t); + char *key = hkvs_slab_at(&t->slab, id); + return hkvs_record__value(c, t, id, key, key_size, 0, pvalue, psize); +} + +int hkvs_record__value(hkvs *c, hkvs_table *t, size_t id, char *key, size_t key_size, int notfound, char **pvalue, size_t *psize) { + uint8_t mark = hkvs_slab_mark(&t->slab, id); + if(mark == 0) return notfound; + + char *p = key + key_size; + size_t size; + + if(mark <= HKVS_MARK_MAX_SMALL) { + size = mark - 1; + if(!pvalue) goto ret_novalue; + goto ret_value; + } + + uint64_t size_64 = hkvs_read_u64(p); + if(size_64 > SIZE_MAX) return -EFBIG; + size = (size_t)size_64; + + if(!pvalue) goto ret_novalue; + + uint64_t val = hkvs_read_u64(p + 8); + switch(mark) { + case HKVS_MARK_EXT: + { + size_t slot; + int r = hkvs_extfile_open(c, val, &slot, size); + if(r < 0) return r; + + hkvs_extfile *f = &c->extfiles[slot]; + hkvs_extfile_pin(c, f); + p = f->addr; + break; + } + + case HKVS_MARK_SLAB: + { + uint8_t slab = val & 255; + val >>= 8; + + if(slab >= HKVS_N_SLABS) return -EBADMSG; + hkvs_slab *s = &c->slabs[slab]; + if(size > s->elt_size) return -EBADMSG; + + if(val > hkvs_slab_size(s)) return -EBADMSG; + size_t sid = (size_t)val; + + p = hkvs_slab_at(s, sid); + break; + } + + default: + return -EBADMSG; + } + +ret_value: + *pvalue = p; +ret_novalue: + if(psize) *psize = size; + return 1; +} + +int hkvs_record_resize(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t new_size, size_t keep_size) { + hkvs_put_records(c); + + if(rid.id > hkvs_slab_size(&t->slab)) goto wtf; + size_t id = (size_t)rid.id; + size_t key_size = hkvs_table__key_size(t); + uint8_t inl_size = hkvs_table__inl_size(t); + uint8_t mark = hkvs_slab_mark(&t->slab, id); + if(mark == 0) goto wtf; + char *key = hkvs_slab_at(&t->slab, id); + char *p = key + key_size; + + uint8_t new_slab = hkvs_slab_for_size(new_size); + + uint64_t old_fi = 0; + size_t old_fi_slot; + + hkvs_slab *old_s = NULL; + size_t old_sid; + + char *old_value; + size_t old_size; + + if(mark < HKVS_MARK_MAX_SMALL) { + if(new_size <= inl_size) { + hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1)); + if(pvalue) *pvalue = p; + return 0; + } + old_value = p; + old_size = mark - 1; + } else { + uint64_t val = hkvs_read_u64(p); + if(val > SIZE_MAX) return -EFBIG; + old_size = (size_t)val; + val = hkvs_read_u64(p + 8); + + switch(mark) { + case HKVS_MARK_EXT: + { + old_fi = val; + + int r = hkvs_extfile_open(c, old_fi, &old_fi_slot, old_size); + if(r < 0) return r; + hkvs_extfile *f = &c->extfiles[old_fi_slot]; + old_value = f->addr; + + if(new_slab >= HKVS_N_SLABS && keep_size <= HKVS_REWRITE_THRESHOLD) { + r = c->io->resize(c->io, old_fi, &f->addr, f->size, new_size); + if(r < 0) return r; + f->size = new_size; + hkvs_write_u64(p, new_size); + if(pvalue) { + hkvs_extfile_pin(c, f); + *pvalue = f->addr; + } + return 0; + } + + break; + } + + case HKVS_MARK_SLAB: + { + uint8_t slab = val & 255; + val >>= 8; + + if(slab >= HKVS_N_SLABS) return -EBADMSG; + old_s = &c->slabs[slab]; + if(old_size > old_s->elt_size) return -EBADMSG; + + if(val > hkvs_slab_size(old_s)) return -EBADMSG; + old_sid = (size_t)val; + + old_value = hkvs_slab_at(old_s, old_sid); + + if(slab == new_slab) { + hkvs_write_u64(p, new_size); + if(pvalue) *pvalue = old_value; + return 0; + } + break; + } + + default: + return -EBADMSG; + } + } + + if(old_size < keep_size) keep_size = old_size; + if(new_size < keep_size) keep_size = new_size; + + char *new_value; + + if(new_size <= inl_size) { + hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1)); + new_value = p; + } else { + new_slab = hkvs_slab_for_size(new_size); + hkvs_write_u64(p, new_size); + if(new_slab < HKVS_N_SLABS) { + hkvs_slab *s = &c->slabs[new_slab]; + size_t sid; + int r = hkvs_slab_alloc(c, s, &sid); + if(r < 0) return r; + hkvs_slab_set_mark(s, sid, 1); + new_value = hkvs_slab_at(s, sid); + hkvs_write_u64(p + 8, ((uint64_t)sid << 8) | new_slab); + hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_SLAB); + } else { + char *addr; + uint64_t new_fi; + int r = hkvs_extfile_create(c, &new_fi, &addr, new_size, !!pvalue); + if(r < 0) return r; + new_value = addr; + hkvs_write_u64(p + 8, new_fi); + hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_EXT); + } + } + + memcpy(new_value, old_value, keep_size); + if(pvalue) *pvalue = new_value; + + if(old_fi) { + hkvs_extfile_close(c, old_fi_slot); + c->io->unlink(c->io, old_fi); + } + + if(old_s) { + hkvs_slab_discard(old_s, old_sid); + } + + return 0; + +wtf: + hkvs_die("record does not exist"); +} + +void hkvs_record_delete(hkvs *c, hkvs_table *t, hkvs_rid rid) { + if(rid.id > hkvs_slab_size(&t->slab)) goto wtf; + size_t id = (size_t)rid.id; + size_t key_size = hkvs_table__key_size(t); + uint8_t mark = hkvs_slab_mark(&t->slab, id); + if(mark == 0) goto wtf; + char *key = hkvs_slab_at(&t->slab, id); + char *p = key + key_size; + + uint64_t unlink_fi = 0; + hkvs_slab *s = NULL; + size_t sid; + + switch(mark) { + case HKVS_MARK_SLAB: + { + uint64_t val = hkvs_read_u64(p + 8); + uint8_t slab = val & 255; + val >>= 8; + + if(slab >= HKVS_N_SLABS) goto corrupted; + s = &c->slabs[slab]; + + if(val >= hkvs_slab_size(s)) goto corrupted; + sid = (size_t)val; + break; + } + + case HKVS_MARK_EXT: + { + uint64_t fi = hkvs_read_u64(p + 8); + if(fi == 0) goto corrupted; + + size_t slot; + if(hkvs_extfile_find(c, fi, &slot)) { + hkvs_extfile_close(c, slot); + } + + unlink_fi = fi; + break; + } + + default: + break; + } + + hkvs_ix_remove(t, id); + hkvs_slab_discard(&t->slab, id); + + if(unlink_fi) { + c->io->unlink(c->io, unlink_fi); + } + + if(s) { + hkvs_slab_discard(s, sid); + } + + return; + +corrupted: + hkvs_die("database corrupted"); + +wtf: + hkvs_die("record does not exist"); +} + +int hkvs_record_append(hkvs *c, hkvs_table *t, hkvs_rid *prid, const char *key, char **pvalue, size_t size) { + size_t key_size = hkvs_table__key_size(t); + uint8_t inl_size = hkvs_table__inl_size(t); + + size_t id; + int r = hkvs_slab_alloc(c, &t->slab, &id); + if(r < 0) return r; + + char *r_key = hkvs_slab_at(&t->slab, id); + memcpy(r_key, key, key_size); + + char *p = r_key + key_size; + char *mark = hkvs_slab_mark_at(&t->slab, id); + + hkvs_slab *s = NULL; + size_t sid; + char *value; + + uint64_t ext_fi = 0; + + if(size <= inl_size) { + hkvs_write_u8(mark, (uint8_t)(size + 1)); + value = p; + } else { + uint8_t slab = hkvs_slab_for_size(size); + hkvs_write_u64(p, size); + if(slab < HKVS_N_SLABS) { + s = &c->slabs[slab]; + r = hkvs_slab_alloc(c, s, &sid); + if(r < 0) return r; + value = hkvs_slab_at(s, sid); + hkvs_slab_set_mark(s, sid, 1); + + hkvs_write_u64(p + 8, (uint64_t)sid << 8 | slab); + hkvs_write_u8(mark, HKVS_MARK_SLAB); + } else { + char *addr; + r = hkvs_extfile_create(c, &ext_fi, &addr, size, !!pvalue); + value = addr; + + hkvs_write_u64(p + 8, ext_fi); + hkvs_write_u8(mark, HKVS_MARK_EXT); + } + } + + r = hkvs_ix_add(c, t, id); + if(r >= 0) { + if(prid) *prid = (hkvs_rid){id}; + if(pvalue) *pvalue = value; + return 1; + } + + hkvs_write_u8(mark, 0); + + if(s) { + hkvs_slab_discard(s, sid); + } + + if(ext_fi) { + size_t slot = --c->n_extfiles; + hkvs_io *io = c->io; + io->close(io, ext_fi, c->extfiles[slot].addr, c->extfiles[slot].size); + io->unlink(io, ext_fi); + } + + return r; +} |