#!/usr/bin/python3 import argparse import sqlite3 from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.serialization import PrivateFormat, NoEncryption, BestAvailableEncryption, load_pem_private_key from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives.serialization import Encoding from concurrent.futures import ThreadPoolExecutor import datetime def parser_factory(): parser = argparse.ArgumentParser(description="Programm zur Administration von Zertifikaten") parser.add_argument("--db", type=str, help="Datenbank die benutzt wird.", default="certificates.sqlite3") subparser = parser.add_subparsers(dest="aktion", title="Kommandos", ) newca = subparser.add_parser("newca", help="erstellt ein neues CA-Zertifikat") request = subparser.add_parser("request", help="erstellt eine Zertifikatsanfrage") sign = subparser.add_parser("sign", help="TBI Signiert eine Zertifikatsanfrage") config = subparser.add_parser("config", help="TBI") for kommando in [ newca, request]: kommando.add_argument("--length", type=int, help="Schlüssellänge des Zertifikats, sollte ein Exponent von 2 und größer als 1024 sein", default=4096, dest="länge") kommando.add_argument("--commonname", type=str, help="Name des Besitzers", required=True, dest="cn") kommando.add_argument("--location", help="Stadt", dest="l") kommando.add_argument("--state", help="Bundesland/Kanton", dest="st") kommando.add_argument("--organization", help="Firma", dest="o") kommando.add_argument("--ou", help="Abteilung", dest="ou") kommando.add_argument("--country", help="Land", dest="c") kommando.add_argument("--street", help="Straße", dest="street") request.add_argument( "--san", help="Alternativer Name für die Resource, kann eine Emailadresse, URI oder eine Domain sein. sonderzeichen in Domains sollten bereits mittels IDNA umkodiert worden sein.", action="append", dest="san", nargs="?", ) request.add_argument("-s", dest="server", action="store_true", help="Serverzertifikat") request.add_argument("-cl", dest="client", action="store_true", help="Clientzertifikat") request.add_argument("-co", dest="code", action="store_true", help="Codesignaturzertifikat") request.add_argument("-e", dest="email", action="store_true", help="Emailzertifikat") request.add_argument("--ca", dest="ca", help="Certificate Authority, Zertifizierungstelle mit optionaler Pfadlänge", nargs="?", type=int) newca.add_argument("tage", type=int, help="Tage die das Zertifikat gültig ist") return parser def initdb(cursor): cursor.execute("CREATE TABLE IF NOT EXISTS \"certificates\" ( `commonname` TEXT NOT NULL, `serial` TEXT NOT NULL UNIQUE, `publickey` TEXT NOT NULL, `privatkey` TEXT NOT NULL UNIQUE, `parent` integer, `ca` INTEGER NOT NULL, PRIMARY KEY(`commonname`), FOREIGN KEY(`parent`) REFERENCES `certificates`(`serial`) ) WITHOUT ROWID") cursor.execute("CREATE TABLE IF NOT EXISTS \"requests\" ( `commonname` TEXT NOT NULL, `csr` TEXT NOT NULL UNIQUE, `privatkey` TEXT NOT NULL, PRIMARY KEY(`commonname`) )") def print_wrp(text): def wrapper(*t): print(text) return wrapper def str_exttype(exttype): if isinstance(exttype, x509.BasicConstraints): if not exttype.ca: return "Keine CA" if exttype.path_length in [None, 0]: return "CA, unendliche Pfadlänge" return "CA, Pfadlänge {}".format(exttype.path_length) elif isinstance(exttype, x509.ExtendedKeyUsage): text = "" for usage in exttype: text += usage._name + ", " return text[:-2] elif isinstance(exttype, x509.SubjectAlternativeName): text = "" for an in exttype._general_names: text += an._value + ", " return text[:-2] else: return repr(exttype) def print_info(cert): for name in cert.subject.rdns: for oid in name: print("{}: {}".format(oid._oid._name, oid.value)) for extension in cert.extensions: kritisch = "kritisch" if not extension.critical: kritisch = "nicht" + kritisch print("{}({}): {}".format(extension.oid._name, kritisch, str_exttype(extension.value))) class Certwrapper: def __init__(self, backend): self.threads = ThreadPoolExecutor(3, "cryptoops") self.anfragetemplate = x509.CertificateSigningRequestBuilder() self.backend = backend def gencert(self, länge): return self.threads.submit(rsa.generate_private_key, 65537, länge, self.backend) def sign(self, subject, issuer, time_valid, public_key, privat_ca_key, extensions=None, ca_key=None): if not extensions: extensions = [] builder = x509.CertificateBuilder() jetzt = datetime.datetime.now() builder = builder.serial_number(x509.random_serial_number()) builder = builder.not_valid_before(jetzt) builder = builder.not_valid_after(jetzt + time_valid) builder = builder.subject_name(subject) builder = builder.issuer_name(issuer) builder = builder.public_key(public_key) for extension in extensions: builder = builder.add_extension(extension, True) if ca_key: builder = builder.add_extension(x509.AuthorityKeyIdentifier(ca_key.fingerprint(hashes.SHA512()), None, None), True) return builder.sign(privat_ca_key, hashes.SHA512(), self.backend) def request(self, subject, erweiterungen=[], certlänge=4096): cert = self.gencert(certlänge) anfrage = self.anfragetemplate.subject_name(subject) for erweiterung in erweiterungen: anfrage = anfrage.add_extension(erweiterung, True) anfrage = anfrage.sign(cert.result(), hashes.SHA512(), self.backend) return { "privat": cert.result(), "csr": anfrage } class DBbackend: def __init__(self, connection): self.connection = connection def insert_certificate(self, commonname: str, serial, publickey, privatkey, ca: bool): self.connection.execute( "INSERT INTO `certificates`(commonname, serial, publickey, privatkey, ca) VALUES (?, ?, ?, ?, ?)", ( commonname, serial, publickey, privatkey, ca ) ) self.connection.commit() def insert_request(self, commonname, cst, privatkey): self.connection.execute( "INSERT INTO `requests`(commonname, csr, privatkey) VALUES (?, ?, ?)", ( commonname, csr, privatkey, ) ) self.connection.commit() class Commands: def __init__(self, backend, connection, args): self.backend = backend self.threads = ThreadPoolExecutor(3, "cryptoops") self.connection = connection self.connection.row_factory = sqlite3.Row self.cursor = connection.cursor() self.arguments = args self.wrapper = Certwrapper(backend) self.db = DBbackend(connection) def get(self, name): try: return self.__getattribute__(name) except (AttributeError, TypeError): return None def fragen(self, fragen, typ=tuple): antworten = [] for id in fragen: antworten.append(typ(id, self.frage(fragen[id], False))) return antworten def frage(self, text, typ=str): while True: antwort = input("{}: ".format(text)) try: return typ(antwort) except TypeError: print("Falsche Eingabe") continue def parsedn(self, dn): namen = [] if dn.cn: namen.append(x509.NameAttribute(x509.NameOID.COMMON_NAME, dn.cn)) if dn.st: namen.append(x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, dn.st)) if dn.l: namen.append(x509.NameAttribute(x509.NameOID.LOCALITY_NAME, dn.l)) if dn.o: namen.append(x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, dn.o)) if dn.ou: namen.append(x509.NameAttribute(x509.NameOID.ORGANIZATIONAL_UNIT_NAME, dn.ou)) if dn.c: namen.append(x509.NameAttribute(x509.NameOID.COUNTRY_NAME, dn.c)) if dn.street: namen.append(x509.NameAttribute(x509.NameOID.STREET_ADDRESS, dn.street)) return namen def newca(self): privater = self.wrapper.gencert(self.arguments.länge) jetzt = datetime.datetime.now() privater.add_done_callback(print_wrp("Privater Schlüssel generiert")) subject = x509.Name(self.parsedn(self.arguments)) zertifikat = self.wrapper.sign( subject=subject, issuer=subject, time_valid=datetime.timedelta(days=self.arguments.tage), public_key=privater.result().public_key(), privat_ca_key=privater.result() ) passwort = self.frage("Passwort") if passwort: encryption = BestAvailableEncryption(passwort.encode()) else: encryption = NoEncryption() self.db.insert_certificate( self.arguments.cn, hex(zertifikat.serial_number), zertifikat.public_bytes(Encoding.PEM), privater.result().private_bytes(encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption), True ) def request(self): erweiterungen = [] if self.arguments.ca: erweiterungen.append(x509.BasicConstraints(True, self.arguments.ca)) else: erweiterungen.append(x509.BasicConstraints(False, None)) if self.arguments.san: alt = [] for san in self.arguments.san: if ":" in san: alt.append(x509.UniformResourceIdentifier(san)) elif "@" in san: alt.append(x509.RFC822Name(san)) else: alt.append(x509.DNSName(san)) erweiterungen.append(x509.SubjectAlternativeName(alt)) ziele = [] if self.arguments.server: ziele.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH) if self.arguments.client: ziele.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH) if self.arguments.code: ziele.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING) if self.arguments.email: ziele.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION) erweiterungen.append(x509.ExtendedKeyUsage(ziele)) anfrage = self.wrapper.request( subject=x509.Name(self.parsedn(self.arguments)), erweiterungen=erweiterungen, certlänge=self.arguments.länge ) passwort = self.frage("Passwort") if passwort: encryption = BestAvailableEncryption(passwort.encode()) else: encryption = NoEncryption() self.db.insert_request( self.arguments.cn, anfrage.public_bytes(Encoding.PEM), privater.result().private_bytes(encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption), ) self.connection.commit() def sign(self): def delete_request(subject): self.cursor.execute("DELETE FROM requests WHERE commonname=?", (subject, )) self.cursor.execute("SELECT commonname, csr, privatkey FROM requests") for reihe in self.cursor.fetchall(): try: anfrage = x509.load_pem_x509_csr(reihe["csr"], self.backend) if not anfrage.is_signature_valid: raise TypeError() except TypeError: delete_request(reihe["commonname"]) continue print_info(anfrage) if not self.frage("Möchten Sie das Zertifikat signieren[01]" ,int): delete_request(reihe["commonname"]) continue cas = [] for row in self.cursor.execute("SELECT commonname, publickey, privatkey FROM certificates WHERE ca=1"): cas.append((row["commonname"], x509.load_pem_x509_certificate(row["publickey"], self.backend), row["privatkey"])) print("Wähle die CA:\n") for counter in range(1, len(cas)+1): print("{}: {}".format(counter, cas[counter-1][0])) while True: antwort = self.frage("CA", int) antwort -= 1 if 0 <= antwort < len(cas): break try: privat = load_pem_private_key(cas[antwort][2], None, self.backend) except TypeError: while True: try: pw = self.frage("Passwort") privat = load_pem_private_key(cas[antwort][2], pw.encode("utf-8"), self.backend) except TypeError: print("Falsches Passwort") ca = cas[antwort][1] dn = cas[antwort][0] zertifikat = self.sign_cert( anfrage.subject, ca.subject, datetime.timedelta(days=self.frage("Laufzeit", int)), anfrage.public_key(), privat, [ext.value for ext in anfrage.extensions], ca ) self.db.insert_certificate( reihe["commonname"], hex(zertifikat.serial_number), zertifikat.public_bytes(Encoding.PEM), reihe["privatkey"], dn, 0 ) delete_request(reihe["commonname"]) self.connection.commit() if __name__ == "__main__": parser = parser_factory() argumente = parser.parse_args() print(argumente.aktion) datenbank = sqlite3.connect(argumente.db) initdb(datenbank.cursor()) commandos = Commands(default_backend(), datenbank, argumente) if commandos.get(argumente.aktion): commandos.get(argumente.aktion)() else: parser.print_help()