diff --git a/ca/myca.py b/ca/myca.py new file mode 100755 index 0000000..ea62bc7 --- /dev/null +++ b/ca/myca.py @@ -0,0 +1,339 @@ +#!/usr/bin/python3 +import argparse +import sqlite3 +from cryptography import x509 +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import PrivateFormat, NoEncryption, BestAvailableEncryption, load_pem_private_key +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives.serialization import Encoding +from concurrent.futures import ThreadPoolExecutor +import datetime + + +def parser_factory(): + parser = argparse.ArgumentParser(description="Programm zur Administration von Zertifikaten") + parser.add_argument("--db", type=str, help="Datenbank die benutzt wird.", default="certificates.sqlite3") + subparser = parser.add_subparsers(dest="aktion", title="Kommandos", ) + newca = subparser.add_parser("newca", help="erstellt ein neues CA-Zertifikat") + request = subparser.add_parser("request", help="erstellt eine Zertifikatsanfrage") + sign = subparser.add_parser("sign", help="TBI Signiert eine Zertifikatsanfrage") + config = subparser.add_parser("config", help="TBI") + for kommando in [ newca, request]: + kommando.add_argument("--length", type=int, help="Schlüssellänge des Zertifikats, sollte ein Exponent von 2 und größer als 1024 sein", default=4096, dest="länge") + kommando.add_argument("--commonname", type=str, help="Name des Besitzers", required=True, dest="cn") + kommando.add_argument("--location", help="Stadt", dest="l") + kommando.add_argument("--state", help="Bundesland/Kanton", dest="st") + kommando.add_argument("--organization", help="Firma", dest="o") + kommando.add_argument("--ou", help="Abteilung", dest="ou") + kommando.add_argument("--country", help="Land", dest="c") + kommando.add_argument("--street", help="Straße", dest="street") + request.add_argument( + "--san", + help="Alternativer Name für die Resource, kann eine Emailadresse, URI oder eine Domain sein. sonderzeichen in Domains sollten bereits mittels IDNA umkodiert worden sein.", + action="append", + dest="san", + nargs="?", + ) + request.add_argument("-s", dest="server", action="store_true", help="Serverzertifikat") + request.add_argument("-cl", dest="client", action="store_true", help="Clientzertifikat") + request.add_argument("-co", dest="code", action="store_true", help="Codesignaturzertifikat") + request.add_argument("-e", dest="email", action="store_true", help="Emailzertifikat") + request.add_argument("--ca", dest="ca", help="Certificate Authority, Zertifizierungstelle mit optionaler Pfadlänge", nargs="?", type=int) + newca.add_argument("tage", type=int, help="Tage die das Zertifikat gültig ist") + return parser + +def initdb(cursor): + cursor.execute("CREATE TABLE IF NOT EXISTS \"certificates\" ( `commonname` TEXT NOT NULL, `serial` TEXT NOT NULL UNIQUE, `publickey` TEXT NOT NULL, `privatkey` TEXT NOT NULL UNIQUE, `parent` integer, `ca` INTEGER NOT NULL, PRIMARY KEY(`commonname`), FOREIGN KEY(`parent`) REFERENCES `certificates`(`serial`) ) WITHOUT ROWID") + cursor.execute("CREATE TABLE IF NOT EXISTS \"requests\" ( `commonname` TEXT NOT NULL, `csr` TEXT NOT NULL UNIQUE, `privatkey` TEXT NOT NULL, PRIMARY KEY(`commonname`) )") + +def print_wrp(text): + def wrapper(*t): + print(text) + return wrapper + +def str_exttype(exttype): + if isinstance(exttype, x509.BasicConstraints): + if not exttype.ca: + return "Keine CA" + if exttype.path_length in [None, 0]: + return "CA, unendliche Pfadlänge" + return "CA, Pfadlänge {}".format(exttype.path_length) + elif isinstance(exttype, x509.ExtendedKeyUsage): + text = "" + for usage in exttype: + text += usage._name + ", " + return text[:-2] + elif isinstance(exttype, x509.SubjectAlternativeName): + text = "" + for an in exttype._general_names: + text += an._value + ", " + return text[:-2] + else: + return repr(exttype) + +def print_info(cert): + for name in cert.subject.rdns: + for oid in name: + print("{}: {}".format(oid._oid._name, oid.value)) + for extension in cert.extensions: + kritisch = "kritisch" + if not extension.critical: + kritisch = "nicht" + kritisch + print("{}({}): {}".format(extension.oid._name, kritisch, str_exttype(extension.value))) + +class Certwrapper: + def __init__(self, backend): + self.threads = ThreadPoolExecutor(3, "cryptoops") + self.anfragetemplate = x509.CertificateSigningRequestBuilder() + self.backend = backend + + def gencert(self, länge): + return self.threads.submit(rsa.generate_private_key, 65537, länge, self.backend) + + def sign(self, subject, issuer, time_valid, public_key, privat_ca_key, extensions=None, ca_key=None): + if not extensions: + extensions = [] + builder = x509.CertificateBuilder() + jetzt = datetime.datetime.now() + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.not_valid_before(jetzt) + builder = builder.not_valid_after(jetzt + time_valid) + builder = builder.subject_name(subject) + builder = builder.issuer_name(issuer) + builder = builder.public_key(public_key) + for extension in extensions: + builder = builder.add_extension(extension, True) + if ca_key: + builder = builder.add_extension(x509.AuthorityKeyIdentifier(ca_key.fingerprint(hashes.SHA512()), None, None), True) + return builder.sign(privat_ca_key, hashes.SHA512(), self.backend) + + def request(self, subject, erweiterungen=[], certlänge=4096): + cert = self.gencert(certlänge) + anfrage = self.anfragetemplate.subject_name(subject) + for erweiterung in erweiterungen: + anfrage = anfrage.add_extension(erweiterung, True) + anfrage = anfrage.sign(cert.result(), hashes.SHA512(), self.backend) + return { + "privat": cert.result(), + "csr": anfrage + } + + +class DBbackend: + def __init__(self, connection): + self.connection = connection + + def insert_certificate(self, commonname: str, serial, publickey, privatkey, ca: bool): + self.connection.execute( + "INSERT INTO `certificates`(commonname, serial, publickey, privatkey, ca) VALUES (?, ?, ?, ?, ?)", + ( + commonname, + serial, + publickey, + privatkey, + ca + ) + ) + self.connection.commit() + + def insert_request(self, commonname, cst, privatkey): + self.connection.execute( + "INSERT INTO `requests`(commonname, csr, privatkey) VALUES (?, ?, ?)", + ( + commonname, + csr, + privatkey, + ) + ) + self.connection.commit() + + +class Commands: + def __init__(self, backend, connection, args): + self.backend = backend + self.threads = ThreadPoolExecutor(3, "cryptoops") + self.connection = connection + self.connection.row_factory = sqlite3.Row + self.cursor = connection.cursor() + self.arguments = args + self.wrapper = Certwrapper(backend) + self.db = DBbackend(connection) + + def get(self, name): + try: + return self.__getattribute__(name) + except (AttributeError, TypeError): + return None + + def fragen(self, fragen, typ=tuple): + antworten = [] + for id in fragen: + antworten.append(typ(id, self.frage(fragen[id], False))) + return antworten + + def frage(self, text, typ=str): + while True: + antwort = input("{}: ".format(text)) + try: + return typ(antwort) + except TypeError: + print("Falsche Eingabe") + continue + + def parsedn(self, dn): + namen = [] + if dn.cn: + namen.append(x509.NameAttribute(x509.NameOID.COMMON_NAME, dn.cn)) + if dn.st: + namen.append(x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, dn.st)) + if dn.l: + namen.append(x509.NameAttribute(x509.NameOID.LOCALITY_NAME, dn.l)) + if dn.o: + namen.append(x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, dn.o)) + if dn.ou: + namen.append(x509.NameAttribute(x509.NameOID.ORGANIZATIONAL_UNIT_NAME, dn.ou)) + if dn.c: + namen.append(x509.NameAttribute(x509.NameOID.COUNTRY_NAME, dn.c)) + if dn.street: + namen.append(x509.NameAttribute(x509.NameOID.STREET_ADDRESS, dn.street)) + return namen + + def newca(self): + privater = self.wrapper.gencert(self.arguments.länge) + jetzt = datetime.datetime.now() + privater.add_done_callback(print_wrp("Privater Schlüssel generiert")) + subject = x509.Name(self.parsedn(self.arguments)) + zertifikat = self.wrapper.sign( + subject=subject, + issuer=subject, + time_valid=datetime.timedelta(days=self.arguments.tage), + public_key=privater.result().public_key(), + privat_ca_key=privater.result() + ) + passwort = self.frage("Passwort") + if passwort: + encryption = BestAvailableEncryption(passwort.encode()) + else: + encryption = NoEncryption() + self.db.insert_certificate( + self.arguments.cn, + hex(zertifikat.serial_number), + zertifikat.public_bytes(Encoding.PEM), + privater.result().private_bytes(encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption), + True + ) + + def request(self): + erweiterungen = [] + if self.arguments.ca: + erweiterungen.append(x509.BasicConstraints(True, self.arguments.ca)) + else: + erweiterungen.append(x509.BasicConstraints(False, None)) + if self.arguments.san: + alt = [] + for san in self.arguments.san: + if ":" in san: + alt.append(x509.UniformResourceIdentifier(san)) + elif "@" in san: + alt.append(x509.RFC822Name(san)) + else: + alt.append(x509.DNSName(san)) + erweiterungen.append(x509.SubjectAlternativeName(alt)) + ziele = [] + if self.arguments.server: + ziele.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH) + if self.arguments.client: + ziele.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH) + if self.arguments.code: + ziele.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING) + if self.arguments.email: + ziele.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION) + erweiterungen.append(x509.ExtendedKeyUsage(ziele)) + anfrage = self.wrapper.request( + subject=x509.Name(self.parsedn(self.arguments)), + erweiterungen=erweiterungen, + certlänge=self.arguments.länge + ) + passwort = self.frage("Passwort") + if passwort: + encryption = BestAvailableEncryption(passwort.encode()) + else: + encryption = NoEncryption() + self.db.insert_request( + self.arguments.cn, + anfrage.public_bytes(Encoding.PEM), + privater.result().private_bytes(encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption), + ) + self.connection.commit() + + def sign(self): + def delete_request(subject): + self.cursor.execute("DELETE FROM requests WHERE commonname=?", (subject, )) + self.cursor.execute("SELECT commonname, csr, privatkey FROM requests") + for reihe in self.cursor.fetchall(): + try: + anfrage = x509.load_pem_x509_csr(reihe["csr"], self.backend) + if not anfrage.is_signature_valid: + raise TypeError() + except TypeError: + delete_request(reihe["commonname"]) + continue + print_info(anfrage) + if not self.frage("Möchten Sie das Zertifikat signieren[01]" ,int): + delete_request(reihe["commonname"]) + continue + cas = [] + for row in self.cursor.execute("SELECT commonname, publickey, privatkey FROM certificates WHERE ca=1"): + cas.append((row["commonname"], x509.load_pem_x509_certificate(row["publickey"], self.backend), row["privatkey"])) + print("Wähle die CA:\n") + for counter in range(1, len(cas)+1): + print("{}: {}".format(counter, cas[counter-1][0])) + while True: + antwort = self.frage("CA", int) + antwort -= 1 + if 0 <= antwort < len(cas): + break + try: + privat = load_pem_private_key(cas[antwort][2], None, self.backend) + except TypeError: + while True: + try: + pw = self.frage("Passwort") + privat = load_pem_private_key(cas[antwort][2], pw.encode("utf-8"), self.backend) + except TypeError: + print("Falsches Passwort") + ca = cas[antwort][1] + dn = cas[antwort][0] + zertifikat = self.sign_cert( + anfrage.subject, + ca.subject, + datetime.timedelta(days=self.frage("Laufzeit", int)), + anfrage.public_key(), + privat, + [ext.value for ext in anfrage.extensions], + ca + ) + self.db.insert_certificate( + reihe["commonname"], + hex(zertifikat.serial_number), + zertifikat.public_bytes(Encoding.PEM), + reihe["privatkey"], + dn, + 0 + ) + delete_request(reihe["commonname"]) + self.connection.commit() + +if __name__ == "__main__": + parser = parser_factory() + argumente = parser.parse_args() + print(argumente.aktion) + datenbank = sqlite3.connect(argumente.db) + initdb(datenbank.cursor()) + commandos = Commands(default_backend(), datenbank, argumente) + if commandos.get(argumente.aktion): + commandos.get(argumente.aktion)() + else: + parser.print_help() \ No newline at end of file