import io, sys, os, operator, re, base64, hashlib
import patchtheory
def strip_subject(s):
if not s.startswith(b'[PATCH'):
raise ValueError(f'Invalid subject: {s!r}')
i = s.find(b'] ')
if i == -1:
raise ValueError(f'Invalid subject: {s!r}')
return s[i+2:]
rem_patch = re.compile('patch ([0-9a-z]{16})').fullmatch
rem_author = re.compile('[^\0-\x1f]+ <[^\0-\x1f]+@[^\0-\x1f]+>').fullmatch
rem_date = re.compile('[^\0-\x1f]+').fullmatch
rem_title = re.compile('[^\0-\x1f]+').fullmatch
class Patch:
__slots__ = ('author', 'date', 'title', 'body', 'diff', 'data', 'hash', 'id')
def __init__(self, /, *, author, date, title, body, diff):
assert type(author) is str and rem_author(author) is not None
assert type(date) is str and rem_date(date) is not None
assert type(title) is str and rem_title(title) is not None
assert type(body) is str
assert type(diff) is patchtheory.Diff
self.author = author
self.date = date
self.title = title
self.body = body
self.diff = diff
self._set_data()
self._set_id()
@classmethod
def parse(tp, body):
lines = iter(body.split(b'\n'))
line = lines.__next__
l = line()
if l.startswith(b'From '):
l = line()
headers = {}
hdr = None
while True:
if not l:
break
if hdr is not None and l.startswith(b' '):
headers[hdr] += l
else:
hdr,sep,val = l.partition(b':')
if not sep:
raise ValueError(f'Expected header line: {l!r}')
headers[hdr] = val.lstrip()
l = line()
author = headers.pop(b'From').decode('utf-8')
date = headers.pop(b'Date').decode('utf-8')
title = strip_subject(headers.pop(b'Subject')).decode('utf-8')
headers.pop(b'MIME-Version', None)
headers.pop(b'Content-Type', None)
headers.pop(b'Content-Transfer-Encoding', None)
body = []
for l in lines:
if l.startswith(b'diff --git'):
break
body.append(l)
else:
raise RuntimeError('unexpected EOF')
while not body[-1]:
del body[-1]
while body[-1].startswith(b' '):
del body[-1]
if body[-1] != b'---':
raise RuntimeError
del body[-1]
while body and not body[-1]:
del body[-1]
body = b'\n'.join(body).decode('utf-8')
return Patch(
author = author,
date = date,
title = title,
body = body,
diff = patchtheory.Diff.parse_l(l, line),
)
def _set_id(self):
out = io.BytesIO()
w = out.write
w(self.author.encode('utf-8'))
w(b'\n')
w(self.date.encode('utf-8'))
w(b'\n')
w(self.title.encode('utf-8'))
w(b'\n')
self.diff.write_munged(w)
out = out.getvalue()
dgst = hashlib.blake2b(out).digest()
self.id = base64.b32encode(dgst[:10]).lower().decode('ascii')
def as_filename(self):
return '-'.join(re.findall('[0-9A-Za-z]+', self.title))[:72]
def _set_data(self):
out = io.BytesIO()
w = out.write
w(b'From: ')
w(self.author.encode('utf-8'))
w(b'\nDate: ')
w(self.date.encode('utf-8'))
w(b'\nSubject: [PATCH] ')
w(self.title.encode('utf-8'))
w(b'\n\n')
w(self.body.encode('utf-8'))
w(b'\n---\n\n')
self.diff.write(w)
w(b'-- \n0.0.0 patchstate\n\n')
self.data = data = out.getvalue()
self.hash = base64.b32encode(hashlib.blake2b(data).digest()[:20]).lower().decode('ascii')
def revert(self):
r = Patch(
author = self.author,
date = self.date,
title = f'Revert "{self.title}"',
body = self.body,
diff = -self.diff,
)
r._set_data()
r._set_id()
return r
class PatchInfo:
__slots__ = ('patch_id', '_body')
def __init__(self, patch_id, info):
self.patch_id = patch_id
self._body = info
def get_patch(self, repo):
p = repo.patches[self.patch_hash]
if p.id != self.patch_id:
raise KeyError(f'patch id mismatch: {self.patch_id!r} -> {p.id!r}')
if p.title != self.patch_title:
raise KeyError(f'patch title mismatch: {self.patch_title!r} -> {p.title!r}')
return p
@property
def patch_hash(self):
return self._body[0]
@property
def patch_title(self):
return self._body[1]
@property
def mode(self):
return self._body[2]
@property
def info(self):
return self._body[3:]
def update(self, *, patch=None, patch_id=None, patch_hash=None, patch_title=None, mode=None, new_mode=None, info=None):
if patch_title is None:
patch_title = self.patch_title
if patch is not None:
assert patch_id is None
assert patch_hash is None
if patch_title != patch.title:
raise ValueError('patch title change: {patch_title!r} -> {patch.title!r}')
patch_id = patch.id
patch_hash = patch.hash
else:
if patch_id is None:
patch_id = self.patch_id
if patch_hash is None:
patch_hash = self.patch_hash
if info is None:
info = self.info
if mode is None:
mode = self.mode
if new_mode is not None and new_mode != mode:
info.insert(0, f'(was {mode})')
mode = new_mode
return PatchInfo(patch_id, [patch_hash, patch_title, mode, *info])
class Series:
__slots__ = ('info',)
def __init__(self, /, info=None):
if info is None:
info = []
self.info = info
@classmethod
def parse(tp, data):
inf = []
lines = iter(data.split('\n'))
l = next(lines)
while True:
if not l:
try:
l = next(lines)
except StopIteration:
break
continue
m = rem_patch(l)
if m is None:
raise ValueError(f'Invalid header: {l!r}')
pid = m.group(1)
ldata = []
while True:
l = next(lines)
if not l.startswith('\t'):
break
ldata.append(l[1:])
if len(ldata) < 3:
raise ValueError(f'Expected data after {f"patch {pid}"!r}')
while ldata and not ldata[-1]:
del ldata[-1]
inf.append(PatchInfo(pid, ldata))
return tp(info=inf)
def fmt(self):
out = io.StringIO()
w = out.write
for pi in self.info:
w(f'patch {pi.patch_id}')
for line in pi._body:
w('\n\t')
w(line)
w('\n')
return out.getvalue()
class Repository:
__slots__ = ('path', 'patches', 'aliases', 'by_id')
def __init__(self, path):
self.path = path
self.patches = {}
self.by_id = {}
aliases = {}
for fn in os.listdir(path):
if not fn.endswith('.patch'):
continue
fp = os.path.join(path, fn)
with io.open(fp, 'rb') as f:
data = f.read()
try:
p = Patch.parse(data)
except ValueError as exc:
print(f'Failed to parse {fn!r}: {exc}', file=sys.stderr)
os.unlink(fp)
continue
h = p.hash
if fn != f'{h}.patch':
aliases[fn[:-6]] = (h,p)
else:
self.patches[h] = p
self.by_id.setdefault(p.id, []).append(p)
for h,(h2,p) in aliases.items():
assert h != h2
assert h not in self.patches
if h2 in self.patches:
continue
print(f'Migrating patch: {h!r} -> {h2!r}', file=sys.stderr)
self.patches[h2] = p
with io.open(os.path.join(self.path, f'{h2}.patch'), 'xb') as f:
f.write(p.data)
self.aliases = {h: h2 for h,(h2,p) in aliases.items()}
def hash_data(self, data):
data = hashlib.blake2b(data).digest()
return base64.b32encode(data[:20]).lower().decode()
def insert_patch(self, patch):
p = self.patches.setdefault(patch.hash, patch)
if p is patch:
pid = p.id
try:
pd = self.by_id[pid]
except KeyError:
self.by_id[pid] = [p]
else:
print(f'Duplicate patch ID: {pid!r} {p.title}', file=sys.stderr)
pd.append(p)
with io.open(os.path.join(self.path, f'{p.hash}.patch'), 'xb') as f:
f.write(p.data)
return p
def import_dir(self, path):
r = Series()
r.info = inf = []
for fn in sorted(os.listdir(path)):
if not fn.endswith('.patch'):
print(f'Skipping {fn!r}', file=sys.stderr)
continue
with io.open(os.path.join(path, fn), 'rb') as f:
patch = Patch.parse(f.read())
patch = self.insert_patch(patch)
inf.append(PatchInfo(patch.id, [patch.hash, patch.title, 'new']))
return r
def gc(self, keep):
for h in self.patches.keys() - keep:
del self.patches[h]
print(f'Removing patch: {h!r}', file=sys.stderr)
os.unlink(os.path.join(self.path, f'{h}.patch'))
def argparse_next(args, func=...):
def do_argparse(func):
while True:
try:
arg = next(args)
except StopIteration:
return None
if arg == '--':
return next(args, None)
if not arg.startswith('-'):
return arg
arg = func(arg)
if arg is not None:
return arg
if func is not ...:
return do_argparse(func)
return do_argparse
def argparse_all(args, func=...):
def do_argparse(func):
r = []
while True:
try:
arg = next(args)
except StopIteration:
return r
if arg == '--':
r.extend(args)
return r
if not arg.startswith('-'):
r.append(arg)
continue
arg = func(arg)
if arg is not None:
r.extend(arg)
if func is not ...:
return do_argparse(func)
return do_argparse