1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-08-23 22:58:53 +02:00

better ssl certificate verification

+ now allows to store&compare peer certificate
- needs more python hacks
This commit is contained in:
Ludwig Nussel
2009-11-13 10:46:23 +00:00
parent bc6f77d1da
commit c3f6f03e3c
4 changed files with 234 additions and 33 deletions

View File

@@ -7,7 +7,7 @@ import sys
import signal import signal
from osc import oscerr from osc import oscerr
from urllib2 import URLError, HTTPError from urllib2 import URLError, HTTPError
from oschttps import NoSecureSSLError from oscsslexcp import NoSecureSSLError
try: try:
from M2Crypto.SSL.Checker import SSLVerificationError from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto.SSL import SSLError as SSLError from M2Crypto.SSL import SSLError as SSLError

View File

@@ -36,7 +36,7 @@ The configuration dictionary could look like this:
import OscConfigParser import OscConfigParser
from osc import oscerr from osc import oscerr
from oschttps import NoSecureSSLError from oscsslexcp import NoSecureSSLError
GENERIC_KEYRING = False GENERIC_KEYRING = False
GNOME_KEYRING = False GNOME_KEYRING = False
@@ -276,33 +276,6 @@ def get_apiurl_usr(apiurl):
return config['user'] return config['user']
def verify_cb(ok, store):
# XXX: this is not really smart. It only detects one error.
# Potentially in the chain which is not that useful to the user.
# We should do this after the ssl handshake.
if(not ok):
err = store.get_error()
cert = store.get_current_cert()
print "*** Certificate verify failed (depth=%s) ***" % store.get_error_depth()
print "Subject: ", cert.get_subject()
print "Issuer: ", cert.get_issuer()
print "Fingerprint: ", cert.get_fingerprint()
print "Valid: ", cert.get_not_before(), "-", cert.get_not_after()
try:
import M2Crypto.Err
reason = M2Crypto.Err.get_x509_verify_error(err)
print "Reason: ", reason
except:
pass
while True:
r = raw_input("continue anyways (y/p/N)? ")
if r == 'y':
return 1
elif r == 'p':
print cert.as_text()
else:
break
return ok
def init_basicauth(config): def init_basicauth(config):
"""initialize urllib2 with the credentials for Basic Authentication""" """initialize urllib2 with the credentials for Basic Authentication"""
@@ -313,8 +286,11 @@ def init_basicauth(config):
if config['api_host_options'][config['apiurl']]['sslcertck']: if config['api_host_options'][config['apiurl']]['sslcertck']:
try: try:
from M2Crypto import m2urllib2, SSL import oscssl
except: oscssl.myHTTPSConnection.appname = 'osc'
from M2Crypto import m2urllib2
except Exception, e:
print e
raise NoSecureSSLError("M2Crypto is needed to access %s in a secure way.\nPlease install python-m2crypto." % config['apiurl']) raise NoSecureSSLError("M2Crypto is needed to access %s in a secure way.\nPlease install python-m2crypto." % config['apiurl'])
import urllib2 import urllib2
@@ -367,8 +343,7 @@ def init_basicauth(config):
elif os.path.isdir(i): elif os.path.isdir(i):
capath = i capath = i
break break
ctx = SSL.Context('sslv3') ctx = oscssl.mySSLContext()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=verify_cb)
if ctx.load_verify_locations(capath=capath, cafile=cafile) != 1: raise Exception('No CA certificates found') if ctx.load_verify_locations(capath=capath, cafile=cafile) != 1: raise Exception('No CA certificates found')
opener = m2urllib2.build_opener(ctx, urllib2.HTTPCookieProcessor(cookiejar), authhandler) opener = m2urllib2.build_opener(ctx, urllib2.HTTPCookieProcessor(cookiejar), authhandler)
else: else:
@@ -681,3 +656,4 @@ def get_config(override_conffile = None,
# finally, initialize urllib2 for to use the credentials for Basic Authentication # finally, initialize urllib2 for to use the credentials for Basic Authentication
init_basicauth(config) init_basicauth(config)
# vim: sw=4 et

225
osc/oscssl.py Normal file
View File

@@ -0,0 +1,225 @@
# Copyright (C) 2009 Novell Inc.
# This program is free software; it may be used, copied, modified
# and distributed under the terms of the GNU General Public Licence,
# either version 2, or (at your option) any later version.
import M2Crypto.httpslib
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto import m2, SSL
class TrustedCertStore:
_tmptrusted = {}
def __init__(self, host, port, app, cert):
self.cert = cert
self.host = host
if self.host == None:
raise Exception("empty host")
if port:
self.host += "_%d" % port
import os
self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
self.file = self.dir + '/%s.pem' % self.host
def is_known(self):
if self.host in self._tmptrusted:
return True
import os
if os.path.exists(self.file):
return True
return False
def is_trusted(self):
import os
if self.host in self._tmptrusted:
cert = self._tmptrusted[self.host]
else:
if not os.path.exists(self.file):
return False
from M2Crypto import X509
cert = X509.load_cert(self.file)
if self.cert.as_pem() == cert.as_pem():
return True
else:
return False
def trust_tmp(self):
self._tmptrusted[self.host] = self.cert
def trust_always(self):
self.trust_tmp()
from M2Crypto import X509
import os
if not os.path.exists(self.dir):
os.makedirs(self.dir)
self.cert.save_pem(self.file)
# verify_cb is called for each error once
# we only collect the errors and return suceess
# connection will be aborted later if it needs to
def verify_cb(verrs, ok, store):
try:
if not ok:
verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
return 1
except Exception, e:
print e
return 0
class FailCert:
def __init__(self, cert):
self.cert = cert
self.errs = []
class ValidationErrors:
def __init__(self):
self.chain_ok = True
self.cert_ok = True
self.failures = {}
def record(self, cert, err, depth):
#print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
if depth == 0:
self.cert_ok = False
else:
self.chain_ok = False
if not depth in self.failures:
self.failures[depth] = FailCert(cert)
else:
if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
raise Exception("Certificate changed unexpectedly. This should not happen")
self.failures[depth].errs.append(err)
def show(self):
for depth in self.failures.keys():
cert = self.failures[depth].cert
print "*** certificate verify failed at depth %d" % depth
print "Subject: ", cert.get_subject()
print "Issuer: ", cert.get_issuer()
print "Valid: ", cert.get_not_before(), "-", cert.get_not_after()
print "Fingerprint(MD5): ", cert.get_fingerprint('md5')
print "Fingerprint(SHA1): ", cert.get_fingerprint('sha1')
for err in self.failures[depth].errs:
reason = "Unknown"
try:
import M2Crypto.Err
reason = M2Crypto.Err.get_x509_verify_error(err)
except:
pass
print "Reason:", reason
# check if the encountered errors could be ignored
def could_ignore(self):
if not 0 in self.failures:
return True
from M2Crypto import m2
nonfatal_errors = [
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
m2.X509_V_ERR_CERT_UNTRUSTED,
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
m2.X509_V_ERR_CERT_NOT_YET_VALID,
m2.X509_V_ERR_CERT_HAS_EXPIRED,
m2.X509_V_OK,
]
canignore = True
for err in self.failures[0].errs:
if not err in nonfatal_errors:
canignore = False
break
return canignore
class mySSLContext(SSL.Context):
def __init__(self):
SSL.Context.__init__(self, 'sslv23')
self.set_options(m2.SSL_OP_ALL | m2.SSL_OP_NO_SSLv2) # m2crypto does this for us but better safe than sorry
self.verrs = ValidationErrors()
#self.set_info_callback() # debug
self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self.verrs, ok, store))
class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
appname = 'generic'
def __init__(self, *args, **kwargs):
M2Crypto.httpslib.origHTTPSConnection.__init__(self, *args, **kwargs)
def connect(self, *args):
r = M2Crypto.httpslib.origHTTPSConnection.connect(self, *args)
cert = self.sock.get_peer_cert()
if not cert:
self.close()
raise SSLVerificationError("server did not present a certificate")
# XXX: should be check if the certificate is known anyways?
# Maybe it changed to something valid.
if not self.sock.verify_ok():
ctx = self.sock.ctx
tc = TrustedCertStore(self.host, self.port, self.appname, cert)
if tc.is_known():
if tc.is_trusted(): # ok, same cert as the stored one
return
else:
print "WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!"
print "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!"
print "offending certificate is at '%s'" % tc.file
raise SSLVerificationError("remote host identification has changed")
ctx.verrs.show()
print
if not ctx.verrs.could_ignore():
raise SSLVerificationError("Certificate validation error cannot be ignored")
if not ctx.verrs.chain_ok:
print "A certificate in the chain failed verification"
if not ctx.verrs.cert_ok:
print "The server certificate failed verification"
while True:
print """
Would you like to
0 - quit (default)
1 - continue anyways
2 - trust the server certificate permanently
9 - review the server certificate
"""
r = raw_input("Enter choice [0129]: ")
if not r or r == '0':
self.close()
raise SSLVerificationError("Untrusted Certificate")
elif r == '1':
tc.trust_tmp()
return
elif r == '2':
tc.trust_always()
return
elif r == '9':
print cert.as_text()
# XXX: do we really need to override m2crypto's httpslib to be able
# to check certificates after connect?
M2Crypto.httpslib.origHTTPSConnection = M2Crypto.httpslib.HTTPSConnection
M2Crypto.httpslib.HTTPSConnection = myHTTPSConnection
# vim: syntax=python sw=4 et