From c3f6f03e3cfc18b278efd71ff9f15c0b5d309092 Mon Sep 17 00:00:00 2001 From: Ludwig Nussel Date: Fri, 13 Nov 2009 10:46:23 +0000 Subject: [PATCH] better ssl certificate verification + now allows to store&compare peer certificate - needs more python hacks --- osc/babysitter.py | 2 +- osc/conf.py | 40 +---- osc/oscssl.py | 225 +++++++++++++++++++++++++++++ osc/{oschttps.py => oscsslexcp.py} | 0 4 files changed, 234 insertions(+), 33 deletions(-) create mode 100644 osc/oscssl.py rename osc/{oschttps.py => oscsslexcp.py} (100%) diff --git a/osc/babysitter.py b/osc/babysitter.py index b822be53..1976c4bd 100644 --- a/osc/babysitter.py +++ b/osc/babysitter.py @@ -7,7 +7,7 @@ import sys import signal from osc import oscerr from urllib2 import URLError, HTTPError -from oschttps import NoSecureSSLError +from oscsslexcp import NoSecureSSLError try: from M2Crypto.SSL.Checker import SSLVerificationError from M2Crypto.SSL import SSLError as SSLError diff --git a/osc/conf.py b/osc/conf.py index bb1378d2..a1d4ee60 100644 --- a/osc/conf.py +++ b/osc/conf.py @@ -36,7 +36,7 @@ The configuration dictionary could look like this: import OscConfigParser from osc import oscerr -from oschttps import NoSecureSSLError +from oscsslexcp import NoSecureSSLError GENERIC_KEYRING = False GNOME_KEYRING = False @@ -276,33 +276,6 @@ def get_apiurl_usr(apiurl): return config['user'] -def verify_cb(ok, store): - # XXX: this is not really smart. It only detects one error. - # Potentially in the chain which is not that useful to the user. - # We should do this after the ssl handshake. - if(not ok): - err = store.get_error() - cert = store.get_current_cert() - print "*** Certificate verify failed (depth=%s) ***" % store.get_error_depth() - print "Subject: ", cert.get_subject() - print "Issuer: ", cert.get_issuer() - print "Fingerprint: ", cert.get_fingerprint() - print "Valid: ", cert.get_not_before(), "-", cert.get_not_after() - try: - import M2Crypto.Err - reason = M2Crypto.Err.get_x509_verify_error(err) - print "Reason: ", reason - except: - pass - while True: - r = raw_input("continue anyways (y/p/N)? ") - if r == 'y': - return 1 - elif r == 'p': - print cert.as_text() - else: - break - return ok def init_basicauth(config): """initialize urllib2 with the credentials for Basic Authentication""" @@ -313,8 +286,11 @@ def init_basicauth(config): if config['api_host_options'][config['apiurl']]['sslcertck']: try: - from M2Crypto import m2urllib2, SSL - except: + import oscssl + oscssl.myHTTPSConnection.appname = 'osc' + from M2Crypto import m2urllib2 + except Exception, e: + print e raise NoSecureSSLError("M2Crypto is needed to access %s in a secure way.\nPlease install python-m2crypto." % config['apiurl']) import urllib2 @@ -367,8 +343,7 @@ def init_basicauth(config): elif os.path.isdir(i): capath = i break - ctx = SSL.Context('sslv3') - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=verify_cb) + ctx = oscssl.mySSLContext() if ctx.load_verify_locations(capath=capath, cafile=cafile) != 1: raise Exception('No CA certificates found') opener = m2urllib2.build_opener(ctx, urllib2.HTTPCookieProcessor(cookiejar), authhandler) else: @@ -681,3 +656,4 @@ def get_config(override_conffile = None, # finally, initialize urllib2 for to use the credentials for Basic Authentication init_basicauth(config) +# vim: sw=4 et diff --git a/osc/oscssl.py b/osc/oscssl.py new file mode 100644 index 00000000..4c77f5c8 --- /dev/null +++ b/osc/oscssl.py @@ -0,0 +1,225 @@ +# Copyright (C) 2009 Novell Inc. +# This program is free software; it may be used, copied, modified +# and distributed under the terms of the GNU General Public Licence, +# either version 2, or (at your option) any later version. + +import M2Crypto.httpslib +from M2Crypto.SSL.Checker import SSLVerificationError +from M2Crypto import m2, SSL + +class TrustedCertStore: + _tmptrusted = {} + + def __init__(self, host, port, app, cert): + + self.cert = cert + self.host = host + if self.host == None: + raise Exception("empty host") + if port: + self.host += "_%d" % port + import os + self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app) + self.file = self.dir + '/%s.pem' % self.host + + def is_known(self): + if self.host in self._tmptrusted: + return True + + import os + if os.path.exists(self.file): + return True + return False + + def is_trusted(self): + import os + if self.host in self._tmptrusted: + cert = self._tmptrusted[self.host] + else: + if not os.path.exists(self.file): + return False + from M2Crypto import X509 + cert = X509.load_cert(self.file) + if self.cert.as_pem() == cert.as_pem(): + return True + else: + return False + + def trust_tmp(self): + self._tmptrusted[self.host] = self.cert + + def trust_always(self): + self.trust_tmp() + from M2Crypto import X509 + import os + if not os.path.exists(self.dir): + os.makedirs(self.dir) + self.cert.save_pem(self.file) + + +# verify_cb is called for each error once +# we only collect the errors and return suceess +# connection will be aborted later if it needs to +def verify_cb(verrs, ok, store): + + try: + if not ok: + verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth()) + return 1 + + except Exception, e: + print e + return 0 + +class FailCert: + def __init__(self, cert): + self.cert = cert + self.errs = [] + +class ValidationErrors: + + def __init__(self): + self.chain_ok = True + self.cert_ok = True + self.failures = {} + + def record(self, cert, err, depth): + #print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err ) + if depth == 0: + self.cert_ok = False + else: + self.chain_ok = False + + if not depth in self.failures: + self.failures[depth] = FailCert(cert) + else: + if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint(): + raise Exception("Certificate changed unexpectedly. This should not happen") + self.failures[depth].errs.append(err) + + def show(self): + for depth in self.failures.keys(): + cert = self.failures[depth].cert + print "*** certificate verify failed at depth %d" % depth + print "Subject: ", cert.get_subject() + print "Issuer: ", cert.get_issuer() + print "Valid: ", cert.get_not_before(), "-", cert.get_not_after() + print "Fingerprint(MD5): ", cert.get_fingerprint('md5') + print "Fingerprint(SHA1): ", cert.get_fingerprint('sha1') + + for err in self.failures[depth].errs: + reason = "Unknown" + try: + import M2Crypto.Err + reason = M2Crypto.Err.get_x509_verify_error(err) + except: + pass + print "Reason:", reason + + # check if the encountered errors could be ignored + def could_ignore(self): + if not 0 in self.failures: + return True + + from M2Crypto import m2 + nonfatal_errors = [ + m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY, + m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN, + m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT, + m2.X509_V_ERR_CERT_UNTRUSTED, + m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE, + + m2.X509_V_ERR_CERT_NOT_YET_VALID, + m2.X509_V_ERR_CERT_HAS_EXPIRED, + m2.X509_V_OK, + ] + + canignore = True + for err in self.failures[0].errs: + if not err in nonfatal_errors: + canignore = False + break + + return canignore + +class mySSLContext(SSL.Context): + + def __init__(self): + SSL.Context.__init__(self, 'sslv23') + self.set_options(m2.SSL_OP_ALL | m2.SSL_OP_NO_SSLv2) # m2crypto does this for us but better safe than sorry + self.verrs = ValidationErrors() + #self.set_info_callback() # debug + self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self.verrs, ok, store)) + + +class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection): + + appname = 'generic' + + def __init__(self, *args, **kwargs): + M2Crypto.httpslib.origHTTPSConnection.__init__(self, *args, **kwargs) + + def connect(self, *args): + r = M2Crypto.httpslib.origHTTPSConnection.connect(self, *args) + cert = self.sock.get_peer_cert() + if not cert: + self.close() + raise SSLVerificationError("server did not present a certificate") + + # XXX: should be check if the certificate is known anyways? + # Maybe it changed to something valid. + if not self.sock.verify_ok(): + ctx = self.sock.ctx + + tc = TrustedCertStore(self.host, self.port, self.appname, cert) + + if tc.is_known(): + + if tc.is_trusted(): # ok, same cert as the stored one + return + else: + print "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!" + print "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!" + print "offending certificate is at '%s'" % tc.file + raise SSLVerificationError("remote host identification has changed") + + ctx.verrs.show() + + print + + if not ctx.verrs.could_ignore(): + raise SSLVerificationError("Certificate validation error cannot be ignored") + + if not ctx.verrs.chain_ok: + print "A certificate in the chain failed verification" + if not ctx.verrs.cert_ok: + print "The server certificate failed verification" + + while True: + print """ +Would you like to + 0 - quit (default) + 1 - continue anyways + 2 - trust the server certificate permanently + 9 - review the server certificate + """ + + r = raw_input("Enter choice [0129]: ") + if not r or r == '0': + self.close() + raise SSLVerificationError("Untrusted Certificate") + elif r == '1': + tc.trust_tmp() + return + elif r == '2': + tc.trust_always() + return + elif r == '9': + print cert.as_text() + +# XXX: do we really need to override m2crypto's httpslib to be able +# to check certificates after connect? +M2Crypto.httpslib.origHTTPSConnection = M2Crypto.httpslib.HTTPSConnection +M2Crypto.httpslib.HTTPSConnection = myHTTPSConnection + +# vim: syntax=python sw=4 et diff --git a/osc/oschttps.py b/osc/oscsslexcp.py similarity index 100% rename from osc/oschttps.py rename to osc/oscsslexcp.py