1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-12-31 20:26:13 +01:00
github.com_openSUSE_osc/osc/oscssl.py
Oleg Girko c62c9f54a0 Add support for TLS SNI if M2Crypto supports it.
Currently osc can't access API URLs which share the same IP address
with other SSL-enabled sites, complaining about certificate
not matching hostname.

This change solves this problem by instructing M2Crypto.SSL.Connection
to send the desired hostname to https server using TLS SNI extension,
thus allowing the server to present the right certificate and choose
the right virtual site.

This is useful for those who can't afford to have a separate IP address
for OBS API.

For TLS SNI to work correctly, M2Crypto should be patched:
https://bugzilla.osafoundation.org/show_bug.cgi?id=13073
Some distributions (like Fedora) already include this patch.

For unpatched M2Crypto osc degrades to operation without TLS SNI.

Signed-off-by: Oleg Girko <ol@infoserver.lv>
2015-08-13 11:11:42 +01:00

373 lines
13 KiB
Python

# Copyright (C) 2009 Novell Inc.
# This program is free software; it may be used, copied, modified
# and distributed under the terms of the GNU General Public Licence,
# either version 2, or (at your option) any later version.
from __future__ import print_function
import M2Crypto.httpslib
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto import m2, SSL
import M2Crypto.m2urllib2
import socket
import sys
try:
from urllib.parse import urlparse, splithost, splitport, splittype
from urllib.request import addinfourl
from http.client import HTTPSConnection
except ImportError:
#python 2.x
from urlparse import urlparse
from urllib import addinfourl, splithost, splitport, splittype
from httplib import HTTPSConnection
from .core import raw_input
class TrustedCertStore:
_tmptrusted = {}
def __init__(self, host, port, app, cert):
self.cert = cert
self.host = host
if self.host == None:
raise Exception("empty host")
if port:
self.host += "_%d" % port
import os
self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
self.file = self.dir + '/%s.pem' % self.host
def is_known(self):
if self.host in self._tmptrusted:
return True
import os
if os.path.exists(self.file):
return True
return False
def is_trusted(self):
import os
if self.host in self._tmptrusted:
cert = self._tmptrusted[self.host]
else:
if not os.path.exists(self.file):
return False
from M2Crypto import X509
cert = X509.load_cert(self.file)
if self.cert.as_pem() == cert.as_pem():
return True
else:
return False
def trust_tmp(self):
self._tmptrusted[self.host] = self.cert
def trust_always(self):
self.trust_tmp()
from M2Crypto import X509
import os
if not os.path.exists(self.dir):
os.makedirs(self.dir)
self.cert.save_pem(self.file)
# verify_cb is called for each error once
# we only collect the errors and return suceess
# connection will be aborted later if it needs to
def verify_cb(ctx, ok, store):
if not ctx.verrs:
ctx.verrs = ValidationErrors()
try:
if not ok:
ctx.verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
return 1
except Exception as e:
print(e, file=sys.stderr)
return 0
class FailCert:
def __init__(self, cert):
self.cert = cert
self.errs = []
class ValidationErrors:
def __init__(self):
self.chain_ok = True
self.cert_ok = True
self.failures = {}
def record(self, cert, err, depth):
#print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
if depth == 0:
self.cert_ok = False
else:
self.chain_ok = False
if not depth in self.failures:
self.failures[depth] = FailCert(cert)
else:
if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
raise Exception("Certificate changed unexpectedly. This should not happen")
self.failures[depth].errs.append(err)
def show(self, out):
for depth in self.failures.keys():
cert = self.failures[depth].cert
print("*** certificate verify failed at depth %d" % depth, file=out)
print("Subject: ", cert.get_subject(), file=out)
print("Issuer: ", cert.get_issuer(), file=out)
print("Valid: ", cert.get_not_before(), "-", cert.get_not_after(), file=out)
print("Fingerprint(MD5): ", cert.get_fingerprint('md5'), file=out)
print("Fingerprint(SHA1): ", cert.get_fingerprint('sha1'), file=out)
for err in self.failures[depth].errs:
reason = "Unknown"
try:
import M2Crypto.Err
reason = M2Crypto.Err.get_x509_verify_error(err)
except:
pass
print("Reason:", reason, file=out)
# check if the encountered errors could be ignored
def could_ignore(self):
if not 0 in self.failures:
return True
nonfatal_errors = [
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
m2.X509_V_ERR_CERT_UNTRUSTED,
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
m2.X509_V_ERR_CERT_NOT_YET_VALID,
m2.X509_V_ERR_CERT_HAS_EXPIRED,
m2.X509_V_OK,
]
canignore = True
for err in self.failures[0].errs:
if not err in nonfatal_errors:
canignore = False
break
return canignore
class mySSLContext(SSL.Context):
def __init__(self):
SSL.Context.__init__(self, 'sslv23')
self.set_options(m2.SSL_OP_NO_SSLv2 | m2.SSL_OP_NO_SSLv3)
self.set_cipher_list("ECDHE-RSA-AES128-SHA256:AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH")
self.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
self.verrs = None
#self.set_info_callback() # debug
self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self, ok, store))
class myHTTPSHandler(M2Crypto.m2urllib2.HTTPSHandler):
handler_order = 499
saved_session = None
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.m2urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
# copied from M2Crypto.m2urllib2.HTTPSHandler
# it's sole purpose is to use our myHTTPSHandler/myHTTPSProxyHandler class
# ideally the m2urllib2.HTTPSHandler.https_open() method would be split into
# "do_open()" and "https_open()" so that we just need to override
# the small "https_open()" method...)
def https_open(self, req):
host = req.get_host()
if not host:
raise M2Crypto.m2urllib2.URLError('no host given: ' + req.get_full_url())
# Our change: Check to see if we're using a proxy.
# Then create an appropriate ssl-aware connection.
full_url = req.get_full_url()
target_host = urlparse(full_url)[1]
if (target_host != host):
h = myProxyHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
# M2Crypto.ProxyHTTPSConnection.putrequest expects a fullurl
selector = full_url
else:
h = myHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
selector = req.get_selector()
# End our change
h.set_debuglevel(self._debuglevel)
if self.saved_session:
h.set_session(self.saved_session)
headers = dict(req.headers)
headers.update(req.unredirected_hdrs)
# We want to make an HTTP/1.1 request, but the addinfourl
# class isn't prepared to deal with a persistent connection.
# It will try to read all remaining data from the socket,
# which will block while the server waits for the next request.
# So make sure the connection gets closed after the (only)
# request.
headers["Connection"] = "close"
try:
h.request(req.get_method(), selector, req.data, headers)
s = h.get_session()
if s:
self.saved_session = s
r = h.getresponse()
except socket.error as err: # XXX what error?
err.filename = full_url
raise M2Crypto.m2urllib2.URLError(err)
# Pick apart the HTTPResponse object to get the addinfourl
# object initialized properly.
# Wrap the HTTPResponse object in socket's file object adapter
# for Windows. That adapter calls recv(), so delegate recv()
# to read(). This weird wrapping allows the returned object to
# have readline() and readlines() methods.
# XXX It might be better to extract the read buffering code
# out of socket._fileobject() and into a base class.
r.recv = r.read
fp = socket._fileobject(r)
resp = addinfourl(fp, r.msg, req.get_full_url())
resp.code = r.status
resp.msg = r.reason
return resp
class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.HTTPSConnection.__init__(self, *args, **kwargs)
def connect(self, *args):
self.sock = SSL.Connection(self.ssl_ctx)
if self.session:
self.sock.set_session(self.session)
if hasattr(self.sock, 'set_tlsext_host_name'):
self.sock.set_tlsext_host_name(self.host)
self.sock.connect((self.host, self.port))
verify_certificate(self)
def getHost(self):
return self.host
def getPort(self):
return self.port
class myProxyHTTPSConnection(M2Crypto.httpslib.ProxyHTTPSConnection, HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.ProxyHTTPSConnection.__init__(self, *args, **kwargs)
def _start_ssl(self):
M2Crypto.httpslib.ProxyHTTPSConnection._start_ssl(self)
verify_certificate(self)
def endheaders(self, *args, **kwargs):
if self._proxy_auth is None:
self._proxy_auth = self._encode_auth()
HTTPSConnection.endheaders(self, *args, **kwargs)
# broken in m2crypto: port needs to be an int
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
#putrequest is called before connect, so can interpret url and get
#real host/port to be used to make CONNECT request to proxy
proto, rest = splittype(url)
if proto is None:
raise ValueError("unknown URL type: %s" % url)
#get host
host, rest = splithost(rest)
#try to get port
host, port = splitport(host)
#if port is not defined try to get from proto
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError("unknown protocol for: %s" % url)
self._real_host = host
self._real_port = int(port)
M2Crypto.httpslib.HTTPSConnection.putrequest(self, method, url, skip_host, skip_accept_encoding)
def getHost(self):
return self._real_host
def getPort(self):
return self._real_port
def verify_certificate(connection):
ctx = connection.sock.ctx
verrs = ctx.verrs
ctx.verrs = None
cert = connection.sock.get_peer_cert()
if not cert:
connection.close()
raise SSLVerificationError("server did not present a certificate")
# XXX: should be check if the certificate is known anyways?
# Maybe it changed to something valid.
if not connection.sock.verify_ok():
tc = TrustedCertStore(connection.getHost(), connection.getPort(), connection.appname, cert)
if tc.is_known():
if tc.is_trusted(): # ok, same cert as the stored one
return
else:
print("WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", file=sys.stderr)
print("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", file=sys.stderr)
print("offending certificate is at '%s'" % tc.file, file=sys.stderr)
raise SSLVerificationError("remote host identification has changed")
# if http_debug is set we redirect sys.stdout to an StringIO
# instance in order to do some header filtering (see conf module)
# so we have to use the "original" stdout for printing
out = getattr(connection, '_orig_stdout', sys.stdout)
verrs.show(out)
print(file=out)
if not verrs.could_ignore():
raise SSLVerificationError("Certificate validation error cannot be ignored")
if not verrs.chain_ok:
print("A certificate in the chain failed verification", file=out)
if not verrs.cert_ok:
print("The server certificate failed verification", file=out)
while True:
print("""
Would you like to
0 - quit (default)
1 - continue anyways
2 - trust the server certificate permanently
9 - review the server certificate
""", file=out)
print("Enter choice [0129]: ", end='', file=out)
r = raw_input()
if not r or r == '0':
connection.close()
raise SSLVerificationError("Untrusted Certificate")
elif r == '1':
tc.trust_tmp()
return
elif r == '2':
tc.trust_always()
return
elif r == '9':
print(cert.as_text(), file=out)
# vim: sw=4 et