# SPDX-License-Identifier: GPL-3.0-only import base64 import logging import getpass import hashlib import json import os import re import subprocess import unicodedata import time import datetime import OpenSSL.crypto as cr class HashAlg: __slots__ = ('len', 'hashlib_name', 'mech', 'ident') def __init__(self, len, hashlib_name, mech, ident): self.len = len self.hashlib_name = hashlib_name self.mech = mech self.ident = ident # TODO: autodetect? USE_RSA_PKCS = True HASH_ALG = { 'SHA1': HashAlg(20, 'sha1', 'SHA1-RSA-PKCS', b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), 'SHA256': HashAlg(32, 'sha256', 'SHA256-RSA-PKCS', b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'), 'SHA384': HashAlg(48, 'sha384', 'SHA384-RSA-PKCS', b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'), 'SHA512': HashAlg(64, 'sha512', 'SHA512-RSA-PKCS', b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'), } def pkcs11_list(): lines = subprocess.check_output(['pkcs11-tool', '--list-slots'], text=True).splitlines() readers = [] for line in lines: m = re.fullmatch('Slot ([0-9]+) \\(.*\\): .*', line) if m is None: continue readers.append(m.group(1)) certs = {} for reader in readers: lines = subprocess.check_output(['pkcs15-tool', '--reader', reader, '-c'], text=True).splitlines() for line in lines: m = re.fullmatch('\tID *: *([0-9a-f]+)$', line) if m is None: continue cert_id = m.group(1) cert_data = subprocess.check_output(['pkcs15-tool', '--reader', reader, '--read-certificate', cert_id]) cert = cr.load_certificate(cr.FILETYPE_PEM, cert_data) certs[reader,cert_id] = cert return certs def pkcs11_sign(key, msg, hash_alg, pin): if USE_RSA_PKCS: mech = 'RSA-PKCS' msg = hash_alg.ident + hashlib.new(hash_alg.hashlib_name, msg).digest() else: mech = hash_alg.mech reader,kid = key env = os.environ.copy() env['PIN'] = pin proc = subprocess.run( ['pkcs11-tool', '--slot', reader, '--id', kid, '-m', mech, '--pin', 'env:PIN', '--sign'], input = msg, stdout = subprocess.PIPE, env = env, check = True, ) return proc.stdout def load_ts(ts): t = time.strptime(ts.decode('ascii'), '%Y%m%d%H%M%SZ') return datetime.datetime( t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, tzinfo=datetime.timezone.utc, ) def check_selector(cert, selector, valid_only): if valid_only: valid_from = load_ts(cert.get_notBefore()) valid_until = load_ts(cert.get_notAfter()) if not (valid_from <= datetime.datetime.now(datetime.timezone.utc) <= valid_until): return False vals = selector.get('issuers') if vals is not None: issuer_cn = [val.decode('utf-8', 'surrogateescape') for name,val in cert.get_issuer().get_components() if name == b'CN'] if issuer_cn: issuer_cn = issuer_cn[0] else: issuer_cn = '' if f'CN={issuer_cn}' not in vals: return False return True def describe_name(val): return ', '.join(f'{k.decode("ascii")}={escape_bad(v.decode("utf-8", "surrogateescape"), ",=")}' for k,v in val.get_components()) def describe_cert(cert): valid_from = load_ts(cert.get_notBefore()) valid_until = load_ts(cert.get_notAfter()) return ( f'Issuer: {describe_name(cert.get_issuer())}\n' f'Subject: {describe_name(cert.get_subject())}\n' f'Valid from: {valid_from:%Y-%m-%d %H:%M:%S} UTC\n' f'Valid until: {valid_until:%Y-%m-%d %H:%M:%S} UTC\n' ) def escape_bad(l, esc=''): r = [] for c in l: o = ord(c) if 0xdc80 <= o <= 0xdd00: r.append(f'\\x{ord(c)-0xdc00:02x}') continue if c == '\\': r.append('\\\\') continue if o < 32 or o == 127 or (c in esc and o < 128): r.append(f'\\x{ord(c):02x}') continue cat = unicodedata.category(c) if cat in {'Cc', 'Cf', 'Cs'} or (cat.startswith('Z') and c != ' ') or c in esc: if o < 0x10000: r.append(f'\\u{o:04x}') else: r.append(f'\\U{o:08x}') continue r.append(c) return ''.join(r) def print_blob(name, data, /): if type(data) is str: kind = 'text' elif type(data) is bytes: kind = 'blob' data = '\n'.join(escape_bad(l.decode('utf-8', 'surrogateescape')) for l in data.splitlines()) else: kind = 'repr' data = repr(data) print(f'{name} ({kind}):') for line in data.splitlines(): print(f' > {line!s}') def prompt(txt, /, default=False): while True: if default: d = 'Y/n' else: d = 'y/N' txt = input(f'{txt} [{d}]? ') if txt in {'y', 'Y'}: return True if txt in {'n', 'N'}: return False if not txt: return default STATUS_TEXT = { 200: 'OK', 201: 'Created', 202: 'Accepted', 203: 'Non-Authoritative Information', 204: 'No Content', 205: 'Reset Content', 206: 'Partial Content', 207: 'Multi-Status', 208: 'Already Reported', 226: 'IM Used', 300: 'Multiple Choices', 301: 'Moved Permanently', 302: 'Found', 303: 'See Other', 304: 'Not Modified', 305: 'Use Proxy', 306: 'Switch Proxy', 307: 'Temporary Redirect', 308: 'Permanent Redirect', 400: 'Bad Request', 401: 'Unauthorized', 402: 'Payment Required', 403: 'Forbidden', 404: 'Not Found', 405: 'Method Not Allowed', 406: 'Not Acceptable', 407: 'Proxy Authentication Required', 408: 'Request Timeout', 409: 'Conflict', 410: 'Gone', 411: 'Length Required', 412: 'Precondition Failed', 413: 'Payload Too Large', 414: 'URI Too Long', 415: 'Unsupported Media Type', 416: 'Range Not Satisfiable', 417: 'Expectation Failed', 418: 'I\'m a teapot', 421: 'Misdirected Request', 422: 'Unprocessable Entity', 423: 'Locked', 424: 'Failed Dependency', 425: 'Too Early', 426: 'Upgrade Required', 428: 'Precondition Required', 429: 'Too Many Requests', 431: 'Request Header Fields Too Large', 451: 'Unavailable For Legal Reasons', 500: 'Internal Server Error', 501: 'Not Implemented', 502: 'Bad Gateway', 503: 'Service Unavailable', 504: 'Gateway Timeout', 505: 'HTTP Version Not Supported', 506: 'Variant Also Negotiates', 507: 'Insufficient Storage', 508: 'Loop Detected', 510: 'Not Extended', 511: 'Network Authentication Required', } STATUS_FULL = {status: f'{status} {text}' for status,text in STATUS_TEXT.items()} class Req: __slots__ = ('path', 'query_string', 'method', 'headers', 'input') def __init__(self, env): self.path = env['PATH_INFO'] self.query_string = env.get('QUERY_STRING') self.method = env['REQUEST_METHOD'] self.input = env['wsgi.input'] headers = self.headers = {} for k,v in env.items(): if not k.startswith('HTTP_'): continue k = ''.join(map(str.title, k[5:].split('_'))) headers[k] = v def read_data(self, /): d = self.input del self.input return d.read() def resp(self, /, status, headers, data, *, content_type): assert type(status) is int assert type(data) is bytes assert type(content_type) is str headers = [*headers] assert all(type(k) is str and type(v) is str for k,v in headers) hdr = headers.append hdr(('Access-Control-Allow-Methods', 'GET, POST')) hdr(('Access-Control-Allow-Headers', 'Accept, Content-Type')) try: hdr(('Access-Control-Allow-Origin', self.headers['Origin'])) except KeyError: pass if self.method != 'OPTIONS': hdr(('Content-Type', content_type)) hdr(('Content-Length', f'{len(data)}')) hdr(('Cache-Control', 'no-cache')) return (status, headers, data) def resp_json(self, /, data, *, status=200, content_type='application/json;charset=UTF-8', headers=()): return self.resp( status, headers, json.dumps(data).encode('utf-8'), content_type=content_type, ) def resp_error(self, status, /): try: text = STATUS_TEXT[status] except KeyError: text = f'HTTP {status}' if message is not None: text = f'{text}: {message}' return self.resp( status, (), text.encode('utf-8'), content_type='text/plain', ) def resp_404(self, /): return self.resp_error(404, **kw) def resp_405(self, /): return self.resp_error(405, **kw) def resp_biss_error(self, /, status, message): return self.resp_json({ 'status': 'failed', 'reasonCode': status, 'reasonText': message, }, status=status) def resp_biss_ok(self, /, **kw): return self.resp_json({ 'status': 'ok', 'reasonCode': 200, 'reasonText': 'response.success', **kw, }) cr.X509StoreFlags.NO_CHECK_TIME = 0x200000 store = cr.X509Store() store.set_flags(cr.X509StoreFlags.CRL_CHECK_ALL | cr.X509StoreFlags.X509_STRICT | cr.X509StoreFlags.NO_CHECK_TIME) store.load_locations('cacerts.pem') certs = pkcs11_list() def req_options(req): return req.resp(200, (), b'', content_type='') def req_get_version(req): return req.resp_json({ 'version': '2.30', 'httpMethods': 'GET, POST', 'contentTypes': 'data', 'signatureTypes': 'signature', 'selectorAvailable': True, 'hashAlgorithms': 'SHA256, SHA384, SHA512', }) def req_get_status(req): return req.resp_json('') def req_post_getsigner(req): data = json.loads(req.read_data().decode('utf-8')) valid_only = data.get('showValidCerts', False) # WAT if valid_only == 'true': valid_only = True if valid_only == 'false': valid_only = False if type(valid_only) is not bool: return req.resp_biss_error(400, 'error.request.bad-type') selector = data.get('selector', {}) for cert in certs.values(): if check_selector(cert, selector, valid_only): break else: return req.resp_biss_error(403, 'error.user-canceled') return req.resp_biss_ok( chain = [ base64.b64encode(cr.dump_certificate(cr.FILETYPE_ASN1, cert)).decode('ascii'), ], ) def req_post_sign(req): data = json.loads(req.read_data().decode('utf-8')) try: own_cert_data = data['signerCertificateB64'] own_cert_data = base64.b64decode(own_cert_data) except (KeyError, IndexError): return req.resp_biss_error(400, 'error.wsp-cert-not-found') for own_key,own_cert in certs.items(): if cr.dump_certificate(cr.FILETYPE_ASN1, own_cert) == own_cert_data: break else: return req.resp_biss_error(400, 'error.request.bad-type') # BISS ignores it # if data['contentType'] != 'data': # return req.resp_biss_error(400, 'error.request.bad-type') try: hash_alg = HASH_ALG[data.get('hashAlgorithm', 'SHA256')] except KeyError: return req.resp_biss_error(400, 'error.request.bad-type') msgs = [] for i,(msg,sig,server_cert) in enumerate(zip(data['contents'], data['signedContents'], data['signedContentsCert'])): msg = base64.b64decode(msg) sig = base64.b64decode(sig) server_cert = cr.load_certificate(cr.FILETYPE_ASN1, base64.b64decode(server_cert)) try: # WAT? cr.verify(server_cert, sig, hashlib.new(hash_alg.hashlib_name, msg).digest(), hash_alg.hashlib_name) except cr.Error: return req.resp_biss_error(400, 'error.request.signature-not-val') verify_ctx = cr.X509StoreContext(store, server_cert, None) try: verify_ctx.verify_certificate() except cr.X509StoreContextError as exc: if not prompt(f'Server certificate invalid ({exc!s}); continue'): return req.resp_biss_error(403, 'error.request.certificate-not-valid') msgs.append((msg, server_cert)) t = data.get('confirmText') if t is not None: print_blob('Confirmation text', t) t = data.get('additionalConfirmText') if t is not None: print_blob('Additional confirmation text', t) for (msg,server_cert) in msgs: print_blob(f'Message #{1+i}', msg) print_blob('Server name', describe_cert(server_cert)) if not prompt('Sign'): return req.resp_biss_error(403, 'error.user-canceled') pin = getpass.getpass('PIN: ') sigs = [] for (msg, server_cert) in msgs: sigs.append(pkcs11_sign(own_key, msg, hash_alg, pin)) return req.resp_biss_ok( signatures = [base64.b64encode(sig).decode('ascii') for sig in sigs], ) def req_post_chain_found(req): # stub data = json.loads(req.read_data().decode('utf-8')) return req.resp_biss_ok(chainFound=True) def fix_methods(hnd): assert type(hnd) is dict and all(type(k) is str for k in hnd) hnd = hnd.copy() if 'GET' in hnd and 'HEAD' not in hnd: on_get = hnd['GET'] def on_head(req): (status, headers, data) = on_get(req) return (status, headers, None) hnd['HEAD'] = on_head hnd['OPTIONS'] = req_options return hnd def resp_exc(env, resp, exc): rdata = ( 'Internal Server Error' '

Internal Server Error

' '' ).encode() resp('500 Internal Server Error', [ ('Content-Type', 'text/html;charset=utf-8'), ('Content-Length', f'{len(rdata)}'), ]) return rdata handlers = { '/version': { 'GET': req_get_version, }, '/status': { 'GET': req_get_status, }, '/getsigner': { 'POST': req_post_getsigner, }, '/sign': { 'POST': req_post_sign, }, '/chainFound': { 'POST': req_post_chain_found, }, } handlers = {path: fix_methods(sub) for path,sub in handlers.items()} def application(env, resp): try: req = Req(env) except Exception as exc: logging.exception('Failed to parse request') return resp_exc(env, resp, exc) try: h = handlers[req.path] except KeyError: h = Req.resp_404 else: try: h = h[req.method] except KeyError: h = Req.resp_405 try: rcode, rhdr, rdata = h(req) except Exception as exc: logging.exception('Request handler failed') return resp_exc(env, resp, exc) assert type(rcode) is int try: rcode = STATUS_FULL[rcode] except KeyError: rcode = f'{rcode}' resp(rcode, rhdr) return rdata tty = os.open('/dev/tty', os.O_RDWR) os.dup2(tty, 0) os.dup2(tty, 1)