aboutsummaryrefslogblamecommitdiff
path: root/lib/record.c
blob: c9c36d6c0fd43d85f1bfac23c864845c6ed219d2 (plain) (tree)































































































































































































































































































































































                                                                                                                                   
// SPDX-License-Identifier: LGPL-3.0-or-later

#include "impl.h"

const char *hkvs_record_key(hkvs *c, hkvs_table *t, hkvs_rid rid) {
	(void)c;

	if(rid.id > hkvs_slab_size(&t->slab)) return NULL;
	size_t id = (size_t)rid.id;
	if(hkvs_slab_mark(&t->slab, id) == 0) return NULL;
	return hkvs_slab_at(&t->slab, id);
}

int hkvs_record_value(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t *psize) {
	if(rid.id > hkvs_slab_size(&t->slab)) return 0;
	size_t id = (size_t)rid.id;
	size_t key_size = hkvs_table__key_size(t);
	char *key = hkvs_slab_at(&t->slab, id);
	return hkvs_record__value(c, t, id, key, key_size, 0, pvalue, psize);
}

int hkvs_record__value(hkvs *c, hkvs_table *t, size_t id, char *key, size_t key_size, int notfound, char **pvalue, size_t *psize) {
	uint8_t mark = hkvs_slab_mark(&t->slab, id);
	if(mark == 0) return notfound;

	char *p = key + key_size;
	size_t size;

	if(mark <= HKVS_MARK_MAX_SMALL) {
		size = mark - 1;
		if(!pvalue) goto ret_novalue;
		goto ret_value;
	}

	uint64_t size_64 = hkvs_read_u64(p);
	if(size_64 > SIZE_MAX) return -EFBIG;
	size = (size_t)size_64;

	if(!pvalue) goto ret_novalue;

	uint64_t val = hkvs_read_u64(p + 8);
	switch(mark) {
		case HKVS_MARK_EXT:
		{
			size_t slot;
			int r = hkvs_extfile_open(c, val, &slot, size);
			if(r < 0) return r;

			hkvs_extfile *f = &c->extfiles[slot];
			hkvs_extfile_pin(c, f);
			p = f->addr;
			break;
		}

		case HKVS_MARK_SLAB:
		{
			uint8_t slab = val & 255;
			val >>= 8;

			if(slab >= HKVS_N_SLABS) return -EBADMSG;
			hkvs_slab *s = &c->slabs[slab];
			if(size > s->elt_size) return -EBADMSG;

			if(val > hkvs_slab_size(s)) return -EBADMSG;
			size_t sid = (size_t)val;

			p = hkvs_slab_at(s, sid);
			break;
		}

		default:
			return -EBADMSG;
	}

ret_value:
	*pvalue = p;
ret_novalue:
	if(psize) *psize = size;
	return 1;
}

int hkvs_record_resize(hkvs *c, hkvs_table *t, hkvs_rid rid, char **pvalue, size_t new_size, size_t keep_size) {
	hkvs_put_records(c);

	if(rid.id > hkvs_slab_size(&t->slab)) goto wtf;
	size_t id = (size_t)rid.id;
	size_t key_size = hkvs_table__key_size(t);
	uint8_t inl_size = hkvs_table__inl_size(t);
	uint8_t mark = hkvs_slab_mark(&t->slab, id);
	if(mark == 0) goto wtf;
	char *key = hkvs_slab_at(&t->slab, id);
	char *p = key + key_size;

	uint8_t new_slab = hkvs_slab_for_size(new_size);

	uint64_t old_fi = 0;
	size_t old_fi_slot;

	hkvs_slab *old_s = NULL;
	size_t old_sid;

	char *old_value;
	size_t old_size;

	if(mark < HKVS_MARK_MAX_SMALL) {
		if(new_size <= inl_size) {
			hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1));
			if(pvalue) *pvalue = p;
			return 0;
		}
		old_value = p;
		old_size = mark - 1;
	} else {
		uint64_t val = hkvs_read_u64(p);
		if(val > SIZE_MAX) return -EFBIG;
		old_size = (size_t)val;
		val = hkvs_read_u64(p + 8);

		switch(mark) {
			case HKVS_MARK_EXT:
			{
				old_fi = val;

				int r = hkvs_extfile_open(c, old_fi, &old_fi_slot, old_size);
				if(r < 0) return r;
				hkvs_extfile *f = &c->extfiles[old_fi_slot];
				old_value = f->addr;

				if(new_slab >= HKVS_N_SLABS && keep_size <= HKVS_REWRITE_THRESHOLD) {
					r = c->io->resize(c->io, old_fi, &f->addr, f->size, new_size);
					if(r < 0) return r;
					f->size = new_size;
					hkvs_write_u64(p, new_size);
					if(pvalue) {
						hkvs_extfile_pin(c, f);
						*pvalue = f->addr;
					}
					return 0;
				}

				break;
			}

			case HKVS_MARK_SLAB:
			{
				uint8_t slab = val & 255;
				val >>= 8;

				if(slab >= HKVS_N_SLABS) return -EBADMSG;
				old_s = &c->slabs[slab];
				if(old_size > old_s->elt_size) return -EBADMSG;

				if(val > hkvs_slab_size(old_s)) return -EBADMSG;
				old_sid = (size_t)val;

				old_value = hkvs_slab_at(old_s, old_sid);

				if(slab == new_slab) {
					hkvs_write_u64(p, new_size);
					if(pvalue) *pvalue = old_value;
					return 0;
				}
				break;
			}

			default:
				return -EBADMSG;
		}
	}

	if(old_size < keep_size) keep_size = old_size;
	if(new_size < keep_size) keep_size = new_size;

	char *new_value;

	if(new_size <= inl_size) {
		hkvs_slab_set_mark(&t->slab, id, (uint8_t)(new_size + 1));
		new_value = p;
	} else {
		new_slab = hkvs_slab_for_size(new_size);
		hkvs_write_u64(p, new_size);
		if(new_slab < HKVS_N_SLABS) {
			hkvs_slab *s = &c->slabs[new_slab];
			size_t sid;
			int r = hkvs_slab_alloc(c, s, &sid);
			if(r < 0) return r;
			hkvs_slab_set_mark(s, sid, 1);
			new_value = hkvs_slab_at(s, sid);
			hkvs_write_u64(p + 8, ((uint64_t)sid << 8) | new_slab);
			hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_SLAB);
		} else {
			char *addr;
			uint64_t new_fi;
			int r = hkvs_extfile_create(c, &new_fi, &addr, new_size, !!pvalue);
			if(r < 0) return r;
			new_value = addr;
			hkvs_write_u64(p + 8, new_fi);
			hkvs_slab_set_mark(&t->slab, id, HKVS_MARK_EXT);
		}
	}

	memcpy(new_value, old_value, keep_size);
	if(pvalue) *pvalue = new_value;

	if(old_fi) {
		hkvs_extfile_close(c, old_fi_slot);
		c->io->unlink(c->io, old_fi);
	}

	if(old_s) {
		hkvs_slab_discard(old_s, old_sid);
	}

	return 0;

wtf:
	hkvs_die("record does not exist");
}

void hkvs_record_delete(hkvs *c, hkvs_table *t, hkvs_rid rid) {
	if(rid.id > hkvs_slab_size(&t->slab)) goto wtf;
	size_t id = (size_t)rid.id;
	size_t key_size = hkvs_table__key_size(t);
	uint8_t mark = hkvs_slab_mark(&t->slab, id);
	if(mark == 0) goto wtf;
	char *key = hkvs_slab_at(&t->slab, id);
	char *p = key + key_size;

	uint64_t unlink_fi = 0;
	hkvs_slab *s = NULL;
	size_t sid;

	switch(mark) {
		case HKVS_MARK_SLAB:
		{
			uint64_t val = hkvs_read_u64(p + 8);
			uint8_t slab = val & 255;
			val >>= 8;

			if(slab >= HKVS_N_SLABS) goto corrupted;
			s = &c->slabs[slab];

			if(val >= hkvs_slab_size(s)) goto corrupted;
			sid = (size_t)val;
			break;
		}

		case HKVS_MARK_EXT:
		{
			uint64_t fi = hkvs_read_u64(p + 8);
			if(fi == 0) goto corrupted;

			size_t slot;
			if(hkvs_extfile_find(c, fi, &slot)) {
				hkvs_extfile_close(c, slot);
			}

			unlink_fi = fi;
			break;
		}

		default:
			break;
	}

	hkvs_ix_remove(t, id);
	hkvs_slab_discard(&t->slab, id);
	
	if(unlink_fi) {
		c->io->unlink(c->io, unlink_fi);
	}

	if(s) {
		hkvs_slab_discard(s, sid);
	}

	return;

corrupted:
	hkvs_die("database corrupted");

wtf:
	hkvs_die("record does not exist");
}

int hkvs_record_append(hkvs *c, hkvs_table *t, hkvs_rid *prid, const char *key, char **pvalue, size_t size) {
	size_t key_size = hkvs_table__key_size(t);
	uint8_t inl_size = hkvs_table__inl_size(t);

	size_t id;
	int r = hkvs_slab_alloc(c, &t->slab, &id);
	if(r < 0) return r;

	char *r_key = hkvs_slab_at(&t->slab, id);
	memcpy(r_key, key, key_size);

	char *p = r_key + key_size;
	char *mark = hkvs_slab_mark_at(&t->slab, id);

	hkvs_slab *s = NULL;
	size_t sid;
	char *value;

	uint64_t ext_fi = 0;

	if(size <= inl_size) {
		hkvs_write_u8(mark, (uint8_t)(size + 1));
		value = p;
	} else {
		uint8_t slab = hkvs_slab_for_size(size);
		hkvs_write_u64(p, size);
		if(slab < HKVS_N_SLABS) {
			s = &c->slabs[slab];
			r = hkvs_slab_alloc(c, s, &sid);
			if(r < 0) return r;
			value = hkvs_slab_at(s, sid);
			hkvs_slab_set_mark(s, sid, 1);

			hkvs_write_u64(p + 8, (uint64_t)sid << 8 | slab);
			hkvs_write_u8(mark, HKVS_MARK_SLAB);
		} else {
			char *addr;
			r = hkvs_extfile_create(c, &ext_fi, &addr, size, !!pvalue);
			value = addr;

			hkvs_write_u64(p + 8, ext_fi);
			hkvs_write_u8(mark, HKVS_MARK_EXT);
		}
	}

	r = hkvs_ix_add(c, t, id);
	if(r >= 0) {
		if(prid) *prid = (hkvs_rid){id};
		if(pvalue) *pvalue = value;
		return 1;
	}

	hkvs_write_u8(mark, 0);

	if(s) {
		hkvs_slab_discard(s, sid);
	}

	if(ext_fi) {
		size_t slot = --c->n_extfiles;
		hkvs_io *io = c->io;
		io->close(io, ext_fi, c->extfiles[slot].addr, c->extfiles[slot].size);
		io->unlink(io, ext_fi);
	}

	return r;
}