summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkcs11.py92
-rw-r--r--rebiss.py94
2 files changed, 107 insertions, 79 deletions
diff --git a/pkcs11.py b/pkcs11.py
new file mode 100644
index 0000000..5256f36
--- /dev/null
+++ b/pkcs11.py
@@ -0,0 +1,92 @@
+import typing
+import subprocess
+import hashlib
+import re
+import os
+import OpenSSL.crypto as cr
+
+
+class HashAlg:
+ __slots__ = ('len', 'hashlib_name', 'mech', 'ident')
+
+ def __init__(self, len, hashlib_name, mech, ident):
+ self.len = len
+ self.hashlib_name = hashlib_name
+ self.mech = mech
+ self.ident = ident
+
+HASH_ALG = {
+ 'SHA1': HashAlg(20, 'sha1', 'SHA1-RSA-PKCS', b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'),
+ 'SHA256': HashAlg(32, 'sha256', 'SHA256-RSA-PKCS', b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'),
+ 'SHA384': HashAlg(48, 'sha384', 'SHA384-RSA-PKCS', b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'),
+ 'SHA512': HashAlg(64, 'sha512', 'SHA512-RSA-PKCS', b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'),
+}
+
+
+class Key:
+ __slots__ = ('reader', 'id', 'cert_data', 'cert', 'use_rsa_pkcs', 'pin')
+
+ reader: str
+ id: str
+ cert_data: bytes
+ cert: typing.Any
+ use_rsa_pkcs: bool
+ pin: str
+
+ def __init__(self, reader: str, id: str) -> None:
+ self.reader = reader
+ self.id = id
+ cert_data = self.cert_data = subprocess.check_output(['pkcs11-tool', '--slot', reader, '-y', 'cert', '-r', '-d', id])
+ self.cert = cr.load_certificate(cr.FILETYPE_ASN1, cert_data)
+ # TODO: autodetect based on key?
+ self.use_rsa_pkcs = True
+
+ def sign(self, msg, hash_alg):
+ if self.use_rsa_pkcs:
+ mech = 'RSA-PKCS'
+ msg = hash_alg.ident + hashlib.new(hash_alg.hashlib_name, msg).digest()
+ else:
+ mech = hash_alg.mech
+
+ env = os.environ.copy()
+ env['PIN'] = self.pin
+ proc = subprocess.run(
+ ['pkcs11-tool', '--slot', self.reader, '--id', self.id, '-m', mech, '--pin', 'env:PIN', '--sign'],
+ input = msg,
+ stdout = subprocess.PIPE,
+ env = env,
+ check = True,
+ )
+
+ return proc.stdout
+
+class Store:
+ __slots__ = ('_impl',)
+
+ def __init__(self):
+ self._impl = cr.X509Store()
+
+ def load_file(self, path):
+ self._impl.load_locations(path, None)
+
+ def cert_chain(self, key: Key):
+ ctx = cr.X509StoreContext(self._impl, key.cert)
+ return ctx.get_verified_chain()
+
+def list() -> typing.Iterable[Key]:
+ lines = subprocess.check_output(['pkcs11-tool', '--list-slots'], text=True).splitlines()
+ readers = []
+ for line in lines:
+ m = re.fullmatch('Slot ([0-9]+) \\(.*\\): .*', line)
+ if m is None:
+ continue
+ readers.append(m.group(1))
+
+ certs = {}
+ for reader in readers:
+ lines = subprocess.check_output(['pkcs11-tool', '--slot', reader, '-y', 'cert', '-O'], text=True).splitlines()
+ for line in lines:
+ m = re.fullmatch(' ID *: *([0-9a-f]+)$', line)
+ if m is None:
+ continue
+ yield Key(reader, m.group(1))
diff --git a/rebiss.py b/rebiss.py
index 8df6a2c..51dbf30 100644
--- a/rebiss.py
+++ b/rebiss.py
@@ -6,77 +6,13 @@ import getpass
import hashlib
import json
import os
-import re
-import subprocess
import unicodedata
import time
import datetime
import OpenSSL.crypto as cr
-
-class HashAlg:
- __slots__ = ('len', 'hashlib_name', 'mech', 'ident')
-
- def __init__(self, len, hashlib_name, mech, ident):
- self.len = len
- self.hashlib_name = hashlib_name
- self.mech = mech
- self.ident = ident
-
-# TODO: autodetect?
-USE_RSA_PKCS = True
-
-HASH_ALG = {
- 'SHA1': HashAlg(20, 'sha1', 'SHA1-RSA-PKCS', b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'),
- 'SHA256': HashAlg(32, 'sha256', 'SHA256-RSA-PKCS', b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'),
- 'SHA384': HashAlg(48, 'sha384', 'SHA384-RSA-PKCS', b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'),
- 'SHA512': HashAlg(64, 'sha512', 'SHA512-RSA-PKCS', b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'),
-}
-
-
-def pkcs11_list():
- lines = subprocess.check_output(['pkcs11-tool', '--list-slots'], text=True).splitlines()
- readers = []
- for line in lines:
- m = re.fullmatch('Slot ([0-9]+) \\(.*\\): .*', line)
- if m is None:
- continue
- readers.append(m.group(1))
-
- certs = {}
- for reader in readers:
- lines = subprocess.check_output(['pkcs15-tool', '--reader', reader, '-c'], text=True).splitlines()
- for line in lines:
- m = re.fullmatch('\tID *: *([0-9a-f]+)$', line)
- if m is None:
- continue
- cert_id = m.group(1)
- cert_data = subprocess.check_output(['pkcs15-tool', '--reader', reader, '--read-certificate', cert_id])
- cert = cr.load_certificate(cr.FILETYPE_PEM, cert_data)
- certs[reader,cert_id] = cert
-
- return certs
-
-def pkcs11_sign(key, msg, hash_alg, pin):
- if USE_RSA_PKCS:
- mech = 'RSA-PKCS'
- msg = hash_alg.ident + hashlib.new(hash_alg.hashlib_name, msg).digest()
- else:
- mech = hash_alg.mech
-
- reader,kid = key
- env = os.environ.copy()
- env['PIN'] = pin
- proc = subprocess.run(
- ['pkcs11-tool', '--slot', reader, '--id', kid, '-m', mech, '--pin', 'env:PIN', '--sign'],
- input = msg,
- stdout = subprocess.PIPE,
- env = env,
- check = True,
- )
-
- return proc.stdout
+import pkcs11
def load_ts(ts):
@@ -295,8 +231,6 @@ class Req:
text = STATUS_TEXT[status]
except KeyError:
text = f'HTTP {status}'
- if message is not None:
- text = f'{text}: {message}'
return self.resp(
status,
(),
@@ -305,10 +239,10 @@ class Req:
)
def resp_404(self, /):
- return self.resp_error(404, **kw)
+ return self.resp_error(404)
def resp_405(self, /):
- return self.resp_error(405, **kw)
+ return self.resp_error(405)
def resp_biss_error(self, /, status, message):
return self.resp_json({
@@ -331,14 +265,14 @@ store = cr.X509Store()
store.set_flags(cr.X509StoreFlags.CRL_CHECK_ALL | cr.X509StoreFlags.X509_STRICT | cr.X509StoreFlags.NO_CHECK_TIME)
store.load_locations('cacerts.pem')
-certs = pkcs11_list()
+keys = [*pkcs11.list()]
def req_options(req):
return req.resp(200, (), b'', content_type='')
def req_get_version(req):
return req.resp_json({
- 'version': '2.30',
+ 'version': '3.15',
'httpMethods': 'GET, POST',
'contentTypes': 'data',
'signatureTypes': 'signature',
@@ -363,15 +297,15 @@ def req_post_getsigner(req):
selector = data.get('selector', {})
- for cert in certs.values():
- if check_selector(cert, selector, valid_only):
+ for key in keys:
+ if check_selector(key.cert, selector, valid_only):
break
else:
return req.resp_biss_error(403, 'error.user-canceled')
return req.resp_biss_ok(
chain = [
- base64.b64encode(cr.dump_certificate(cr.FILETYPE_ASN1, cert)).decode('ascii'),
+ base64.b64encode(key.cert_data).decode('ascii'),
],
)
@@ -384,10 +318,11 @@ def req_post_sign(req):
except (KeyError, IndexError):
return req.resp_biss_error(400, 'error.wsp-cert-not-found')
- for own_key,own_cert in certs.items():
- if cr.dump_certificate(cr.FILETYPE_ASN1, own_cert) == own_cert_data:
+ for own_key in keys:
+ if own_key.cert_data == own_cert_data:
break
else:
+ logging.error('Unknown certificate')
return req.resp_biss_error(400, 'error.request.bad-type')
# BISS ignores it
@@ -395,8 +330,9 @@ def req_post_sign(req):
# return req.resp_biss_error(400, 'error.request.bad-type')
try:
- hash_alg = HASH_ALG[data.get('hashAlgorithm', 'SHA256')]
+ hash_alg = pkcs11.HASH_ALG[data.get('hashAlgorithm', 'SHA256')]
except KeyError:
+ logging.error('Unknown hash algorithm')
return req.resp_biss_error(400, 'error.request.bad-type')
msgs = []
@@ -435,11 +371,11 @@ def req_post_sign(req):
if not prompt('Sign'):
return req.resp_biss_error(403, 'error.user-canceled')
- pin = getpass.getpass('PIN: ')
+ own_key.pin = getpass.getpass('PIN: ')
sigs = []
for (msg, server_cert) in msgs:
- sigs.append(pkcs11_sign(own_key, msg, hash_alg, pin))
+ sigs.append(own_key.sign(msg, hash_alg))
return req.resp_biss_ok(
signatures = [base64.b64encode(sig).decode('ascii') for sig in sigs],