From a2b2fe7da78378dc6a64f58c14e907496c9adea9 Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Thu, 26 Aug 2021 17:30:35 +0300 Subject: Initial commit --- patchstate.py | 383 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 patchstate.py (limited to 'patchstate.py') diff --git a/patchstate.py b/patchstate.py new file mode 100644 index 0000000..79600e0 --- /dev/null +++ b/patchstate.py @@ -0,0 +1,383 @@ +import io, sys, os, operator, re, base64, hashlib +import patchtheory + +def strip_subject(s): + if not s.startswith(b'[PATCH'): + raise ValueError(f'Invalid subject: {s!r}') + i = s.find(b'] ') + if i == -1: + raise ValueError(f'Invalid subject: {s!r}') + return s[i+2:] + +rem_patch = re.compile('patch ([0-9a-z]{16})').fullmatch +rem_author = re.compile('[^\0-\x1f]+ <[^\0-\x1f]+@[^\0-\x1f]+>').fullmatch +rem_date = re.compile('[^\0-\x1f]+').fullmatch +rem_title = re.compile('[^\0-\x1f]+').fullmatch + +class Patch: + __slots__ = ('author', 'date', 'title', 'body', 'diff', 'data', 'hash', 'id') + + def __init__(self, /, *, author, date, title, body, diff): + assert type(author) is str and rem_author(author) is not None + assert type(date) is str and rem_date(date) is not None + assert type(title) is str and rem_title(title) is not None + assert type(body) is str + assert type(diff) is patchtheory.Diff + + self.author = author + self.date = date + self.title = title + self.body = body + self.diff = diff + + self._set_data() + self._set_id() + + @classmethod + def parse(tp, body): + lines = iter(body.split(b'\n')) + line = lines.__next__ + + l = line() + if l.startswith(b'From '): + l = line() + + headers = {} + hdr = None + while True: + if not l: + break + if hdr is not None and l.startswith(b' '): + headers[hdr] += l + else: + hdr,sep,val = l.partition(b':') + if not sep: + raise ValueError(f'Expected header line: {l!r}') + headers[hdr] = val.lstrip() + l = line() + + author = headers.pop(b'From').decode('utf-8') + date = headers.pop(b'Date').decode('utf-8') + title = strip_subject(headers.pop(b'Subject')).decode('utf-8') + + headers.pop(b'MIME-Version', None) + headers.pop(b'Content-Type', None) + headers.pop(b'Content-Transfer-Encoding', None) + + body = [] + for l in lines: + if l.startswith(b'diff --git'): + break + body.append(l) + else: + raise RuntimeError('unexpected EOF') + while not body[-1]: + del body[-1] + while body[-1].startswith(b' '): + del body[-1] + if body[-1] != b'---': + raise RuntimeError + del body[-1] + while body and not body[-1]: + del body[-1] + + body = b'\n'.join(body).decode('utf-8') + + return Patch( + author = author, + date = date, + title = title, + body = body, + diff = patchtheory.Diff.parse_l(l, line), + ) + + def _set_id(self): + out = io.BytesIO() + w = out.write + w(self.author.encode('utf-8')) + w(b'\n') + w(self.date.encode('utf-8')) + w(b'\n') + w(self.title.encode('utf-8')) + w(b'\n') + self.diff.write_munged(w) + + out = out.getvalue() + dgst = hashlib.blake2b(out).digest() + self.id = base64.b32encode(dgst[:10]).lower().decode('ascii') + + def as_filename(self): + return '-'.join(re.findall('[0-9A-Za-z]+', self.title))[:72] + + def _set_data(self): + out = io.BytesIO() + w = out.write + + w(b'From: ') + w(self.author.encode('utf-8')) + w(b'\nDate: ') + w(self.date.encode('utf-8')) + w(b'\nSubject: [PATCH] ') + w(self.title.encode('utf-8')) + w(b'\n\n') + w(self.body.encode('utf-8')) + w(b'\n---\n\n') + self.diff.write(w) + w(b'-- \n0.0.0 patchstate\n\n') + + self.data = data = out.getvalue() + + self.hash = base64.b32encode(hashlib.blake2b(data).digest()[:20]).lower().decode('ascii') + + def revert(self): + r = Patch( + author = self.author, + date = self.date, + title = f'Revert "{self.title}"', + body = self.body, + diff = -self.diff, + ) + + r._set_data() + r._set_id() + return r + +class PatchInfo: + __slots__ = ('patch_id', '_body') + + def __init__(self, patch_id, info): + self.patch_id = patch_id + self._body = info + + def get_patch(self, repo): + p = repo.patches[self.patch_hash] + if p.id != self.patch_id: + raise KeyError(f'patch id mismatch: {self.patch_id!r} -> {p.id!r}') + if p.title != self.patch_title: + raise KeyError(f'patch title mismatch: {self.patch_title!r} -> {p.title!r}') + return p + + @property + def patch_hash(self): + return self._body[0] + + @property + def patch_title(self): + return self._body[1] + + @property + def mode(self): + return self._body[2] + + @property + def info(self): + return self._body[3:] + + def update(self, *, patch=None, patch_id=None, patch_hash=None, patch_title=None, mode=None, new_mode=None, info=None): + if patch_title is None: + patch_title = self.patch_title + + if patch is not None: + assert patch_id is None + assert patch_hash is None + if patch_title != patch.title: + raise ValueError('patch title change: {patch_title!r} -> {patch.title!r}') + patch_id = patch.id + patch_hash = patch.hash + + else: + if patch_id is None: + patch_id = self.patch_id + + if patch_hash is None: + patch_hash = self.patch_hash + + if info is None: + info = self.info + + if mode is None: + mode = self.mode + + if new_mode is not None and new_mode != mode: + info.insert(0, f'(was {mode})') + mode = new_mode + + return PatchInfo(patch_id, [patch_hash, patch_title, mode, *info]) + +class Series: + __slots__ = ('info',) + + def __init__(self, /, info=None): + if info is None: + info = [] + self.info = info + + @classmethod + def parse(tp, data): + inf = [] + + lines = iter(data.split('\n')) + l = next(lines) + while True: + if not l: + try: + l = next(lines) + except StopIteration: + break + continue + m = rem_patch(l) + if m is None: + raise ValueError(f'Invalid header: {l!r}') + pid = m.group(1) + ldata = [] + while True: + l = next(lines) + if not l.startswith('\t'): + break + ldata.append(l[1:]) + if len(ldata) < 3: + raise ValueError(f'Expected data after {f"patch {pid}"!r}') + + while ldata and not ldata[-1]: + del ldata[-1] + + inf.append(PatchInfo(pid, ldata)) + + return tp(info=inf) + + def fmt(self): + out = io.StringIO() + w = out.write + + for pi in self.info: + w(f'patch {pi.patch_id}') + for line in pi._body: + w('\n\t') + w(line) + w('\n') + + return out.getvalue() + +class Repository: + __slots__ = ('path', 'patches', 'aliases', 'by_id') + + def __init__(self, path): + self.path = path + self.patches = {} + self.by_id = {} + + aliases = {} + for fn in os.listdir(path): + if not fn.endswith('.patch'): + continue + fp = os.path.join(path, fn) + with io.open(fp, 'rb') as f: + data = f.read() + + try: + p = Patch.parse(data) + except ValueError as exc: + print(f'Failed to parse {fn!r}: {exc}', file=sys.stderr) + os.unlink(fp) + continue + + h = p.hash + if fn != f'{h}.patch': + aliases[fn[:-6]] = (h,p) + else: + self.patches[h] = p + self.by_id.setdefault(p.id, []).append(p) + + for h,(h2,p) in aliases.items(): + assert h != h2 + assert h not in self.patches + if h2 in self.patches: + continue + print(f'Migrating patch: {h!r} -> {h2!r}', file=sys.stderr) + self.patches[h2] = p + with io.open(os.path.join(self.path, f'{h2}.patch'), 'xb') as f: + f.write(p.data) + + self.aliases = {h: h2 for h,(h2,p) in aliases.items()} + + def hash_data(self, data): + data = hashlib.blake2b(data).digest() + return base64.b32encode(data[:20]).lower().decode() + + def insert_patch(self, patch): + p = self.patches.setdefault(patch.hash, patch) + if p is patch: + pid = p.id + try: + pd = self.by_id[pid] + except KeyError: + self.by_id[pid] = [p] + else: + print(f'Duplicate patch ID: {pid!r} {p.title}', file=sys.stderr) + pd.append(p) + + with io.open(os.path.join(self.path, f'{p.hash}.patch'), 'xb') as f: + f.write(p.data) + + return p + + def import_dir(self, path): + r = Series() + r.info = inf = [] + + for fn in sorted(os.listdir(path)): + if not fn.endswith('.patch'): + print(f'Skipping {fn!r}', file=sys.stderr) + continue + with io.open(os.path.join(path, fn), 'rb') as f: + patch = Patch.parse(f.read()) + + patch = self.insert_patch(patch) + inf.append(PatchInfo(patch.id, [patch.hash, patch.title, 'new'])) + + return r + + def gc(self, keep): + for h in self.patches.keys() - keep: + del self.patches[h] + print(f'Removing patch: {h!r}', file=sys.stderr) + os.unlink(os.path.join(self.path, f'{h}.patch')) + +def argparse_next(args, func=...): + def do_argparse(func): + while True: + try: + arg = next(args) + except StopIteration: + return None + if arg == '--': + return next(args, None) + if not arg.startswith('-'): + return arg + arg = func(arg) + if arg is not None: + return arg + if func is not ...: + return do_argparse(func) + return do_argparse + +def argparse_all(args, func=...): + def do_argparse(func): + r = [] + while True: + try: + arg = next(args) + except StopIteration: + return r + if arg == '--': + r.extend(args) + return r + if not arg.startswith('-'): + r.append(arg) + continue + arg = func(arg) + if arg is not None: + r.extend(arg) + if func is not ...: + return do_argparse(func) + return do_argparse -- cgit