diff options
-rw-r--r-- | pkcs11.py | 92 | ||||
-rw-r--r-- | rebiss.py | 94 |
2 files changed, 107 insertions, 79 deletions
diff --git a/pkcs11.py b/pkcs11.py new file mode 100644 index 0000000..5256f36 --- /dev/null +++ b/pkcs11.py @@ -0,0 +1,92 @@ +import typing +import subprocess +import hashlib +import re +import os +import OpenSSL.crypto as cr + + +class HashAlg: + __slots__ = ('len', 'hashlib_name', 'mech', 'ident') + + def __init__(self, len, hashlib_name, mech, ident): + self.len = len + self.hashlib_name = hashlib_name + self.mech = mech + self.ident = ident + +HASH_ALG = { + 'SHA1': HashAlg(20, 'sha1', 'SHA1-RSA-PKCS', b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), + 'SHA256': HashAlg(32, 'sha256', 'SHA256-RSA-PKCS', b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'), + 'SHA384': HashAlg(48, 'sha384', 'SHA384-RSA-PKCS', b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'), + 'SHA512': HashAlg(64, 'sha512', 'SHA512-RSA-PKCS', b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'), +} + + +class Key: + __slots__ = ('reader', 'id', 'cert_data', 'cert', 'use_rsa_pkcs', 'pin') + + reader: str + id: str + cert_data: bytes + cert: typing.Any + use_rsa_pkcs: bool + pin: str + + def __init__(self, reader: str, id: str) -> None: + self.reader = reader + self.id = id + cert_data = self.cert_data = subprocess.check_output(['pkcs11-tool', '--slot', reader, '-y', 'cert', '-r', '-d', id]) + self.cert = cr.load_certificate(cr.FILETYPE_ASN1, cert_data) + # TODO: autodetect based on key? + self.use_rsa_pkcs = True + + def sign(self, msg, hash_alg): + if self.use_rsa_pkcs: + mech = 'RSA-PKCS' + msg = hash_alg.ident + hashlib.new(hash_alg.hashlib_name, msg).digest() + else: + mech = hash_alg.mech + + env = os.environ.copy() + env['PIN'] = self.pin + proc = subprocess.run( + ['pkcs11-tool', '--slot', self.reader, '--id', self.id, '-m', mech, '--pin', 'env:PIN', '--sign'], + input = msg, + stdout = subprocess.PIPE, + env = env, + check = True, + ) + + return proc.stdout + +class Store: + __slots__ = ('_impl',) + + def __init__(self): + self._impl = cr.X509Store() + + def load_file(self, path): + self._impl.load_locations(path, None) + + def cert_chain(self, key: Key): + ctx = cr.X509StoreContext(self._impl, key.cert) + return ctx.get_verified_chain() + +def list() -> typing.Iterable[Key]: + lines = subprocess.check_output(['pkcs11-tool', '--list-slots'], text=True).splitlines() + readers = [] + for line in lines: + m = re.fullmatch('Slot ([0-9]+) \\(.*\\): .*', line) + if m is None: + continue + readers.append(m.group(1)) + + certs = {} + for reader in readers: + lines = subprocess.check_output(['pkcs11-tool', '--slot', reader, '-y', 'cert', '-O'], text=True).splitlines() + for line in lines: + m = re.fullmatch(' ID *: *([0-9a-f]+)$', line) + if m is None: + continue + yield Key(reader, m.group(1)) @@ -6,77 +6,13 @@ import getpass import hashlib import json import os -import re -import subprocess import unicodedata import time import datetime import OpenSSL.crypto as cr - -class HashAlg: - __slots__ = ('len', 'hashlib_name', 'mech', 'ident') - - def __init__(self, len, hashlib_name, mech, ident): - self.len = len - self.hashlib_name = hashlib_name - self.mech = mech - self.ident = ident - -# TODO: autodetect? -USE_RSA_PKCS = True - -HASH_ALG = { - 'SHA1': HashAlg(20, 'sha1', 'SHA1-RSA-PKCS', b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), - 'SHA256': HashAlg(32, 'sha256', 'SHA256-RSA-PKCS', b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'), - 'SHA384': HashAlg(48, 'sha384', 'SHA384-RSA-PKCS', b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'), - 'SHA512': HashAlg(64, 'sha512', 'SHA512-RSA-PKCS', b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'), -} - - -def pkcs11_list(): - lines = subprocess.check_output(['pkcs11-tool', '--list-slots'], text=True).splitlines() - readers = [] - for line in lines: - m = re.fullmatch('Slot ([0-9]+) \\(.*\\): .*', line) - if m is None: - continue - readers.append(m.group(1)) - - certs = {} - for reader in readers: - lines = subprocess.check_output(['pkcs15-tool', '--reader', reader, '-c'], text=True).splitlines() - for line in lines: - m = re.fullmatch('\tID *: *([0-9a-f]+)$', line) - if m is None: - continue - cert_id = m.group(1) - cert_data = subprocess.check_output(['pkcs15-tool', '--reader', reader, '--read-certificate', cert_id]) - cert = cr.load_certificate(cr.FILETYPE_PEM, cert_data) - certs[reader,cert_id] = cert - - return certs - -def pkcs11_sign(key, msg, hash_alg, pin): - if USE_RSA_PKCS: - mech = 'RSA-PKCS' - msg = hash_alg.ident + hashlib.new(hash_alg.hashlib_name, msg).digest() - else: - mech = hash_alg.mech - - reader,kid = key - env = os.environ.copy() - env['PIN'] = pin - proc = subprocess.run( - ['pkcs11-tool', '--slot', reader, '--id', kid, '-m', mech, '--pin', 'env:PIN', '--sign'], - input = msg, - stdout = subprocess.PIPE, - env = env, - check = True, - ) - - return proc.stdout +import pkcs11 def load_ts(ts): @@ -295,8 +231,6 @@ class Req: text = STATUS_TEXT[status] except KeyError: text = f'HTTP {status}' - if message is not None: - text = f'{text}: {message}' return self.resp( status, (), @@ -305,10 +239,10 @@ class Req: ) def resp_404(self, /): - return self.resp_error(404, **kw) + return self.resp_error(404) def resp_405(self, /): - return self.resp_error(405, **kw) + return self.resp_error(405) def resp_biss_error(self, /, status, message): return self.resp_json({ @@ -331,14 +265,14 @@ store = cr.X509Store() store.set_flags(cr.X509StoreFlags.CRL_CHECK_ALL | cr.X509StoreFlags.X509_STRICT | cr.X509StoreFlags.NO_CHECK_TIME) store.load_locations('cacerts.pem') -certs = pkcs11_list() +keys = [*pkcs11.list()] def req_options(req): return req.resp(200, (), b'', content_type='') def req_get_version(req): return req.resp_json({ - 'version': '2.30', + 'version': '3.15', 'httpMethods': 'GET, POST', 'contentTypes': 'data', 'signatureTypes': 'signature', @@ -363,15 +297,15 @@ def req_post_getsigner(req): selector = data.get('selector', {}) - for cert in certs.values(): - if check_selector(cert, selector, valid_only): + for key in keys: + if check_selector(key.cert, selector, valid_only): break else: return req.resp_biss_error(403, 'error.user-canceled') return req.resp_biss_ok( chain = [ - base64.b64encode(cr.dump_certificate(cr.FILETYPE_ASN1, cert)).decode('ascii'), + base64.b64encode(key.cert_data).decode('ascii'), ], ) @@ -384,10 +318,11 @@ def req_post_sign(req): except (KeyError, IndexError): return req.resp_biss_error(400, 'error.wsp-cert-not-found') - for own_key,own_cert in certs.items(): - if cr.dump_certificate(cr.FILETYPE_ASN1, own_cert) == own_cert_data: + for own_key in keys: + if own_key.cert_data == own_cert_data: break else: + logging.error('Unknown certificate') return req.resp_biss_error(400, 'error.request.bad-type') # BISS ignores it @@ -395,8 +330,9 @@ def req_post_sign(req): # return req.resp_biss_error(400, 'error.request.bad-type') try: - hash_alg = HASH_ALG[data.get('hashAlgorithm', 'SHA256')] + hash_alg = pkcs11.HASH_ALG[data.get('hashAlgorithm', 'SHA256')] except KeyError: + logging.error('Unknown hash algorithm') return req.resp_biss_error(400, 'error.request.bad-type') msgs = [] @@ -435,11 +371,11 @@ def req_post_sign(req): if not prompt('Sign'): return req.resp_biss_error(403, 'error.user-canceled') - pin = getpass.getpass('PIN: ') + own_key.pin = getpass.getpass('PIN: ') sigs = [] for (msg, server_cert) in msgs: - sigs.append(pkcs11_sign(own_key, msg, hash_alg, pin)) + sigs.append(own_key.sign(msg, hash_alg)) return req.resp_biss_ok( signatures = [base64.b64encode(sig).decode('ascii') for sig in sigs], |