import binascii import datetime import errno import os import socket import ssl import sys import tempfile import typing import urllib3.connection from cryptography import x509 from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from urllib3.util.ssl_ import create_urllib3_context from . import oscerr # based on openssl's include/openssl/x509_vfy.h.in X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT = 18 X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN = 19 def create_ssl_context(): """ Create a ssl context with disabled weak crypto. Relatively safe defaults are set in urllib3 already, but we restrict crypto even more. """ ssl_context = create_urllib3_context() ssl_context.options |= ssl.OP_NO_SSLv2 ssl_context.options |= ssl.OP_NO_SSLv3 ssl_context.options |= ssl.OP_NO_TLSv1 ssl_context.options |= ssl.OP_NO_TLSv1_1 return ssl_context class CertVerificationError(oscerr.OscBaseError): def __str__(self): args_str = [str(i) for i in self.args] return "Certificate Verification Error: " + "\n".join(args_str) class TrustedCertStore: def __init__(self, ssl_context, host, port): self.ssl_context = ssl_context self.host = host self.port = port or 443 if not self.host: raise ValueError("Empty `host`") file_name = f"{self.host}_{self.port}" self.dir_path = os.path.expanduser("~/.config/osc/trusted-certs") self.pem_path = os.path.join(self.dir_path, f"{file_name}.pem") if os.path.isfile(self.pem_path): # load permanently trusted certificate that is stored on disk with open(self.pem_path, "rb") as f: self.cert = x509.load_pem_x509_certificate(f.read()) self.ssl_context.load_verify_locations(cafile=self.pem_path) else: self.cert = None def get_server_certificate(self): # The following code throws an exception on self-signed certs, # therefore we need to retrieve the cert differently. # pem = ssl.get_server_certificate((self.host, self.port)) ssl_context = create_ssl_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE sock = ssl_context.wrap_socket(socket.socket(), server_hostname=self.host) sock.connect((self.host, self.port)) der = sock.getpeercert(binary_form=True) pem = ssl.DER_cert_to_PEM_cert(der) cert = x509.load_pem_x509_certificate(pem.encode("utf-8")) return cert def trust_permanently(self, cert): """ Permanently trust the certificate. Store it as a pem file in ~/.config/osc/trusted-certs. """ self.cert = cert data = self.cert.public_bytes(serialization.Encoding.PEM) with open(self.pem_path, "wb") as f: f.write(data) self.ssl_context.load_verify_locations(cafile=self.pem_path) def trust_temporarily(self, cert): """ Temporarily trust the certificate. """ self.cert = cert tmp_dir = os.path.expanduser("~/.config/osc") data = self.cert.public_bytes(serialization.Encoding.PEM) with tempfile.NamedTemporaryFile(mode="wb+", dir=tmp_dir, prefix="temp_trusted_cert_") as f: f.write(data) f.flush() self.ssl_context.load_verify_locations(cafile=f.name) @staticmethod def _display_cert(cert): print("Subject:", cert.subject.rfc4514_string()) print("Issuer:", cert.issuer.rfc4514_string()) try: san_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) san_ext_value = typing.cast(x509.SubjectAlternativeName, san_ext.value) san_ext_dnsnames = san_ext_value.get_values_for_type(x509.DNSName) except x509.extensions.ExtensionNotFound: san_ext_dnsnames = ["(not available)"] for san in san_ext_dnsnames: print("subjectAltName:", san) print("Valid:", cert.not_valid_before, "->", cert.not_valid_after) print("Fingerprint(MD5):", binascii.hexlify(cert.fingerprint(hashes.MD5())).decode("utf-8")) print("Fingerprint(SHA1):", binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode("utf-8")) def prompt_trust(self, cert, reason): if self.cert: # check if the certificate matches the already trusted certificate for the host and port if cert != self.cert: raise CertVerificationError([ "Remote host identification has changed", "", "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", "", f"Offending certificate is at '{self.pem_path}'" ]) else: # since there is no trusted certificate on disk, # let's display the server cert and give user options to trust it print("The server certificate failed verification") print() self._display_cert(cert) print(f"Reason: {reason}") while True: print(""" Would you like to 0 - quit (default) 1 - continue anyways 2 - trust the server certificate permanently 9 - review the server certificate """) print("Enter choice [0129]: ", end="") r = input() if not r or r == "0": raise CertVerificationError(["Untrusted certificate"]) elif r == "1": self.trust_temporarily(cert) return elif r == "2": self.trust_permanently(cert) return elif r == "9": print(cert.to_txt())