summaryrefslogtreecommitdiff
path: root/patchstate.py
diff options
context:
space:
mode:
Diffstat (limited to 'patchstate.py')
-rw-r--r--patchstate.py383
1 files changed, 383 insertions, 0 deletions
diff --git a/patchstate.py b/patchstate.py
new file mode 100644
index 0000000..79600e0
--- /dev/null
+++ b/patchstate.py
@@ -0,0 +1,383 @@
+import io, sys, os, operator, re, base64, hashlib
+import patchtheory
+
+def strip_subject(s):
+ if not s.startswith(b'[PATCH'):
+ raise ValueError(f'Invalid subject: {s!r}')
+ i = s.find(b'] ')
+ if i == -1:
+ raise ValueError(f'Invalid subject: {s!r}')
+ return s[i+2:]
+
+rem_patch = re.compile('patch ([0-9a-z]{16})').fullmatch
+rem_author = re.compile('[^\0-\x1f]+ <[^\0-\x1f]+@[^\0-\x1f]+>').fullmatch
+rem_date = re.compile('[^\0-\x1f]+').fullmatch
+rem_title = re.compile('[^\0-\x1f]+').fullmatch
+
+class Patch:
+ __slots__ = ('author', 'date', 'title', 'body', 'diff', 'data', 'hash', 'id')
+
+ def __init__(self, /, *, author, date, title, body, diff):
+ assert type(author) is str and rem_author(author) is not None
+ assert type(date) is str and rem_date(date) is not None
+ assert type(title) is str and rem_title(title) is not None
+ assert type(body) is str
+ assert type(diff) is patchtheory.Diff
+
+ self.author = author
+ self.date = date
+ self.title = title
+ self.body = body
+ self.diff = diff
+
+ self._set_data()
+ self._set_id()
+
+ @classmethod
+ def parse(tp, body):
+ lines = iter(body.split(b'\n'))
+ line = lines.__next__
+
+ l = line()
+ if l.startswith(b'From '):
+ l = line()
+
+ headers = {}
+ hdr = None
+ while True:
+ if not l:
+ break
+ if hdr is not None and l.startswith(b' '):
+ headers[hdr] += l
+ else:
+ hdr,sep,val = l.partition(b':')
+ if not sep:
+ raise ValueError(f'Expected header line: {l!r}')
+ headers[hdr] = val.lstrip()
+ l = line()
+
+ author = headers.pop(b'From').decode('utf-8')
+ date = headers.pop(b'Date').decode('utf-8')
+ title = strip_subject(headers.pop(b'Subject')).decode('utf-8')
+
+ headers.pop(b'MIME-Version', None)
+ headers.pop(b'Content-Type', None)
+ headers.pop(b'Content-Transfer-Encoding', None)
+
+ body = []
+ for l in lines:
+ if l.startswith(b'diff --git'):
+ break
+ body.append(l)
+ else:
+ raise RuntimeError('unexpected EOF')
+ while not body[-1]:
+ del body[-1]
+ while body[-1].startswith(b' '):
+ del body[-1]
+ if body[-1] != b'---':
+ raise RuntimeError
+ del body[-1]
+ while body and not body[-1]:
+ del body[-1]
+
+ body = b'\n'.join(body).decode('utf-8')
+
+ return Patch(
+ author = author,
+ date = date,
+ title = title,
+ body = body,
+ diff = patchtheory.Diff.parse_l(l, line),
+ )
+
+ def _set_id(self):
+ out = io.BytesIO()
+ w = out.write
+ w(self.author.encode('utf-8'))
+ w(b'\n')
+ w(self.date.encode('utf-8'))
+ w(b'\n')
+ w(self.title.encode('utf-8'))
+ w(b'\n')
+ self.diff.write_munged(w)
+
+ out = out.getvalue()
+ dgst = hashlib.blake2b(out).digest()
+ self.id = base64.b32encode(dgst[:10]).lower().decode('ascii')
+
+ def as_filename(self):
+ return '-'.join(re.findall('[0-9A-Za-z]+', self.title))[:72]
+
+ def _set_data(self):
+ out = io.BytesIO()
+ w = out.write
+
+ w(b'From: ')
+ w(self.author.encode('utf-8'))
+ w(b'\nDate: ')
+ w(self.date.encode('utf-8'))
+ w(b'\nSubject: [PATCH] ')
+ w(self.title.encode('utf-8'))
+ w(b'\n\n')
+ w(self.body.encode('utf-8'))
+ w(b'\n---\n\n')
+ self.diff.write(w)
+ w(b'-- \n0.0.0 patchstate\n\n')
+
+ self.data = data = out.getvalue()
+
+ self.hash = base64.b32encode(hashlib.blake2b(data).digest()[:20]).lower().decode('ascii')
+
+ def revert(self):
+ r = Patch(
+ author = self.author,
+ date = self.date,
+ title = f'Revert "{self.title}"',
+ body = self.body,
+ diff = -self.diff,
+ )
+
+ r._set_data()
+ r._set_id()
+ return r
+
+class PatchInfo:
+ __slots__ = ('patch_id', '_body')
+
+ def __init__(self, patch_id, info):
+ self.patch_id = patch_id
+ self._body = info
+
+ def get_patch(self, repo):
+ p = repo.patches[self.patch_hash]
+ if p.id != self.patch_id:
+ raise KeyError(f'patch id mismatch: {self.patch_id!r} -> {p.id!r}')
+ if p.title != self.patch_title:
+ raise KeyError(f'patch title mismatch: {self.patch_title!r} -> {p.title!r}')
+ return p
+
+ @property
+ def patch_hash(self):
+ return self._body[0]
+
+ @property
+ def patch_title(self):
+ return self._body[1]
+
+ @property
+ def mode(self):
+ return self._body[2]
+
+ @property
+ def info(self):
+ return self._body[3:]
+
+ def update(self, *, patch=None, patch_id=None, patch_hash=None, patch_title=None, mode=None, new_mode=None, info=None):
+ if patch_title is None:
+ patch_title = self.patch_title
+
+ if patch is not None:
+ assert patch_id is None
+ assert patch_hash is None
+ if patch_title != patch.title:
+ raise ValueError('patch title change: {patch_title!r} -> {patch.title!r}')
+ patch_id = patch.id
+ patch_hash = patch.hash
+
+ else:
+ if patch_id is None:
+ patch_id = self.patch_id
+
+ if patch_hash is None:
+ patch_hash = self.patch_hash
+
+ if info is None:
+ info = self.info
+
+ if mode is None:
+ mode = self.mode
+
+ if new_mode is not None and new_mode != mode:
+ info.insert(0, f'(was {mode})')
+ mode = new_mode
+
+ return PatchInfo(patch_id, [patch_hash, patch_title, mode, *info])
+
+class Series:
+ __slots__ = ('info',)
+
+ def __init__(self, /, info=None):
+ if info is None:
+ info = []
+ self.info = info
+
+ @classmethod
+ def parse(tp, data):
+ inf = []
+
+ lines = iter(data.split('\n'))
+ l = next(lines)
+ while True:
+ if not l:
+ try:
+ l = next(lines)
+ except StopIteration:
+ break
+ continue
+ m = rem_patch(l)
+ if m is None:
+ raise ValueError(f'Invalid header: {l!r}')
+ pid = m.group(1)
+ ldata = []
+ while True:
+ l = next(lines)
+ if not l.startswith('\t'):
+ break
+ ldata.append(l[1:])
+ if len(ldata) < 3:
+ raise ValueError(f'Expected data after {f"patch {pid}"!r}')
+
+ while ldata and not ldata[-1]:
+ del ldata[-1]
+
+ inf.append(PatchInfo(pid, ldata))
+
+ return tp(info=inf)
+
+ def fmt(self):
+ out = io.StringIO()
+ w = out.write
+
+ for pi in self.info:
+ w(f'patch {pi.patch_id}')
+ for line in pi._body:
+ w('\n\t')
+ w(line)
+ w('\n')
+
+ return out.getvalue()
+
+class Repository:
+ __slots__ = ('path', 'patches', 'aliases', 'by_id')
+
+ def __init__(self, path):
+ self.path = path
+ self.patches = {}
+ self.by_id = {}
+
+ aliases = {}
+ for fn in os.listdir(path):
+ if not fn.endswith('.patch'):
+ continue
+ fp = os.path.join(path, fn)
+ with io.open(fp, 'rb') as f:
+ data = f.read()
+
+ try:
+ p = Patch.parse(data)
+ except ValueError as exc:
+ print(f'Failed to parse {fn!r}: {exc}', file=sys.stderr)
+ os.unlink(fp)
+ continue
+
+ h = p.hash
+ if fn != f'{h}.patch':
+ aliases[fn[:-6]] = (h,p)
+ else:
+ self.patches[h] = p
+ self.by_id.setdefault(p.id, []).append(p)
+
+ for h,(h2,p) in aliases.items():
+ assert h != h2
+ assert h not in self.patches
+ if h2 in self.patches:
+ continue
+ print(f'Migrating patch: {h!r} -> {h2!r}', file=sys.stderr)
+ self.patches[h2] = p
+ with io.open(os.path.join(self.path, f'{h2}.patch'), 'xb') as f:
+ f.write(p.data)
+
+ self.aliases = {h: h2 for h,(h2,p) in aliases.items()}
+
+ def hash_data(self, data):
+ data = hashlib.blake2b(data).digest()
+ return base64.b32encode(data[:20]).lower().decode()
+
+ def insert_patch(self, patch):
+ p = self.patches.setdefault(patch.hash, patch)
+ if p is patch:
+ pid = p.id
+ try:
+ pd = self.by_id[pid]
+ except KeyError:
+ self.by_id[pid] = [p]
+ else:
+ print(f'Duplicate patch ID: {pid!r} {p.title}', file=sys.stderr)
+ pd.append(p)
+
+ with io.open(os.path.join(self.path, f'{p.hash}.patch'), 'xb') as f:
+ f.write(p.data)
+
+ return p
+
+ def import_dir(self, path):
+ r = Series()
+ r.info = inf = []
+
+ for fn in sorted(os.listdir(path)):
+ if not fn.endswith('.patch'):
+ print(f'Skipping {fn!r}', file=sys.stderr)
+ continue
+ with io.open(os.path.join(path, fn), 'rb') as f:
+ patch = Patch.parse(f.read())
+
+ patch = self.insert_patch(patch)
+ inf.append(PatchInfo(patch.id, [patch.hash, patch.title, 'new']))
+
+ return r
+
+ def gc(self, keep):
+ for h in self.patches.keys() - keep:
+ del self.patches[h]
+ print(f'Removing patch: {h!r}', file=sys.stderr)
+ os.unlink(os.path.join(self.path, f'{h}.patch'))
+
+def argparse_next(args, func=...):
+ def do_argparse(func):
+ while True:
+ try:
+ arg = next(args)
+ except StopIteration:
+ return None
+ if arg == '--':
+ return next(args, None)
+ if not arg.startswith('-'):
+ return arg
+ arg = func(arg)
+ if arg is not None:
+ return arg
+ if func is not ...:
+ return do_argparse(func)
+ return do_argparse
+
+def argparse_all(args, func=...):
+ def do_argparse(func):
+ r = []
+ while True:
+ try:
+ arg = next(args)
+ except StopIteration:
+ return r
+ if arg == '--':
+ r.extend(args)
+ return r
+ if not arg.startswith('-'):
+ r.append(arg)
+ continue
+ arg = func(arg)
+ if arg is not None:
+ r.extend(arg)
+ if func is not ...:
+ return do_argparse(func)
+ return do_argparse