1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-12-30 19:56:14 +01:00
github.com_openSUSE_osc/osc/oscssl.py
Marcus Huewe ff19a5f539 Fix SSL.Connection instantiation in case of an old M2Crypto
Do not pass a family parameter to SSL.Connection's constructor if it
does not support it. If the family parameter is not supported, we
_try_ to fallback to socket.AF_INET, which is implicitly used by
older versions of the SSL.Connection class.

Fixes: #274 ("osc 0.157 Exception AttributeError: "Connection
instance has no attribute 'ssl_close_flag'"")
2017-03-09 13:34:17 +01:00

409 lines
14 KiB
Python

# Copyright (C) 2009 Novell Inc.
# This program is free software; it may be used, copied, modified
# and distributed under the terms of the GNU General Public Licence,
# either version 2, or (at your option) any later version.
from __future__ import print_function
import M2Crypto.httpslib
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto import m2, SSL
import M2Crypto.m2urllib2
import socket
import sys
import inspect
try:
from urllib.parse import urlparse, splithost, splitport, splittype
from urllib.request import addinfourl
from http.client import HTTPSConnection
except ImportError:
#python 2.x
from urlparse import urlparse
from urllib import addinfourl, splithost, splitport, splittype
from httplib import HTTPSConnection
from .core import raw_input
class TrustedCertStore:
_tmptrusted = {}
def __init__(self, host, port, app, cert):
self.cert = cert
self.host = host
if self.host == None:
raise Exception("empty host")
if port:
self.host += "_%d" % port
import os
self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
self.file = self.dir + '/%s.pem' % self.host
def is_known(self):
if self.host in self._tmptrusted:
return True
import os
if os.path.exists(self.file):
return True
return False
def is_trusted(self):
import os
if self.host in self._tmptrusted:
cert = self._tmptrusted[self.host]
else:
if not os.path.exists(self.file):
return False
from M2Crypto import X509
cert = X509.load_cert(self.file)
if self.cert.as_pem() == cert.as_pem():
return True
else:
return False
def trust_tmp(self):
self._tmptrusted[self.host] = self.cert
def trust_always(self):
self.trust_tmp()
from M2Crypto import X509
import os
if not os.path.exists(self.dir):
os.makedirs(self.dir)
self.cert.save_pem(self.file)
# verify_cb is called for each error once
# we only collect the errors and return suceess
# connection will be aborted later if it needs to
def verify_cb(ctx, ok, store):
if not ctx.verrs:
ctx.verrs = ValidationErrors()
try:
if not ok:
ctx.verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
return 1
except Exception as e:
print(e, file=sys.stderr)
return 0
class FailCert:
def __init__(self, cert):
self.cert = cert
self.errs = []
class ValidationErrors:
def __init__(self):
self.chain_ok = True
self.cert_ok = True
self.failures = {}
def record(self, cert, err, depth):
#print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
if depth == 0:
self.cert_ok = False
else:
self.chain_ok = False
if not depth in self.failures:
self.failures[depth] = FailCert(cert)
else:
if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
raise Exception("Certificate changed unexpectedly. This should not happen")
self.failures[depth].errs.append(err)
def show(self, out):
for depth in self.failures.keys():
cert = self.failures[depth].cert
print("*** certificate verify failed at depth %d" % depth, file=out)
print("Subject: ", cert.get_subject(), file=out)
print("Issuer: ", cert.get_issuer(), file=out)
print("Valid: ", cert.get_not_before(), "-", cert.get_not_after(), file=out)
print("Fingerprint(MD5): ", cert.get_fingerprint('md5'), file=out)
print("Fingerprint(SHA1): ", cert.get_fingerprint('sha1'), file=out)
for err in self.failures[depth].errs:
reason = "Unknown"
try:
import M2Crypto.Err
reason = M2Crypto.Err.get_x509_verify_error(err)
except:
pass
print("Reason:", reason, file=out)
# check if the encountered errors could be ignored
def could_ignore(self):
if not 0 in self.failures:
return True
nonfatal_errors = [
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
m2.X509_V_ERR_CERT_UNTRUSTED,
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
m2.X509_V_ERR_CERT_NOT_YET_VALID,
m2.X509_V_ERR_CERT_HAS_EXPIRED,
m2.X509_V_OK,
]
canignore = True
for err in self.failures[0].errs:
if not err in nonfatal_errors:
canignore = False
break
return canignore
class mySSLContext(SSL.Context):
def __init__(self):
SSL.Context.__init__(self, 'sslv23')
self.set_options(m2.SSL_OP_NO_SSLv2 | m2.SSL_OP_NO_SSLv3)
self.set_cipher_list("ECDHE-RSA-AES128-SHA256:AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH")
self.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
self.verrs = None
#self.set_info_callback() # debug
self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self, ok, store))
class myHTTPSHandler(M2Crypto.m2urllib2.HTTPSHandler):
handler_order = 499
saved_session = None
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.m2urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
# copied from M2Crypto.m2urllib2.HTTPSHandler
# it's sole purpose is to use our myHTTPSHandler/myHTTPSProxyHandler class
# ideally the m2urllib2.HTTPSHandler.https_open() method would be split into
# "do_open()" and "https_open()" so that we just need to override
# the small "https_open()" method...)
def https_open(self, req):
host = req.get_host()
if not host:
raise M2Crypto.m2urllib2.URLError('no host given: ' + req.get_full_url())
# Our change: Check to see if we're using a proxy.
# Then create an appropriate ssl-aware connection.
full_url = req.get_full_url()
target_host = urlparse(full_url)[1]
if (target_host != host):
h = myProxyHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
# M2Crypto.ProxyHTTPSConnection.putrequest expects a fullurl
selector = full_url
else:
h = myHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
selector = req.get_selector()
# End our change
h.set_debuglevel(self._debuglevel)
if self.saved_session:
h.set_session(self.saved_session)
headers = dict(req.headers)
headers.update(req.unredirected_hdrs)
# We want to make an HTTP/1.1 request, but the addinfourl
# class isn't prepared to deal with a persistent connection.
# It will try to read all remaining data from the socket,
# which will block while the server waits for the next request.
# So make sure the connection gets closed after the (only)
# request.
headers["Connection"] = "close"
try:
h.request(req.get_method(), selector, req.data, headers)
s = h.get_session()
if s:
self.saved_session = s
r = h.getresponse()
except socket.error as err: # XXX what error?
err.filename = full_url
raise M2Crypto.m2urllib2.URLError(err)
# Pick apart the HTTPResponse object to get the addinfourl
# object initialized properly.
# Wrap the HTTPResponse object in socket's file object adapter
# for Windows. That adapter calls recv(), so delegate recv()
# to read(). This weird wrapping allows the returned object to
# have readline() and readlines() methods.
# XXX It might be better to extract the read buffering code
# out of socket._fileobject() and into a base class.
r.recv = r.read
fp = socket._fileobject(r)
resp = addinfourl(fp, r.msg, req.get_full_url())
resp.code = r.status
resp.msg = r.reason
return resp
class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.HTTPSConnection.__init__(self, *args, **kwargs)
def _connect(self, family):
# workaround for old M2Crypto versions where the the
# SSL.Connection.__init__ constructor has no "family" parameter
kwargs = {}
argspec = inspect.getargspec(SSL.Connection.__init__)
if 'family' in argspec.args:
kwargs['family'] = family
elif family != socket.AF_INET:
# old SSL.Connection classes implicitly use socket.AF_INET
return False
self.sock = SSL.Connection(self.ssl_ctx, **kwargs)
if self.session:
self.sock.set_session(self.session)
if hasattr(self.sock, 'set_tlsext_host_name'):
self.sock.set_tlsext_host_name(self.host)
self.sock.connect((self.host, self.port))
return True
def connect(self):
# based on M2Crypto.httpslib.HTTPSConnection.connect
last_exc = None
connected = False
for addrinfo in socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM,
0, 0):
try:
connected = self._connect(addrinfo[0])
if connected:
break
except socket.error as e:
last_exc = e
finally:
if not connected and self.sock is not None:
self.sock.close()
if not connected:
if last_exc is None:
msg = 'getaddrinfo returned empty list or unsupported families'
raise RuntimeError(msg)
raise last_exc
# ok we are connected, verify cert
verify_certificate(self)
def getHost(self):
return self.host
def getPort(self):
return self.port
class myProxyHTTPSConnection(M2Crypto.httpslib.ProxyHTTPSConnection, HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.ProxyHTTPSConnection.__init__(self, *args, **kwargs)
def _start_ssl(self):
M2Crypto.httpslib.ProxyHTTPSConnection._start_ssl(self)
verify_certificate(self)
def endheaders(self, *args, **kwargs):
if self._proxy_auth is None:
self._proxy_auth = self._encode_auth()
HTTPSConnection.endheaders(self, *args, **kwargs)
# broken in m2crypto: port needs to be an int
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
#putrequest is called before connect, so can interpret url and get
#real host/port to be used to make CONNECT request to proxy
proto, rest = splittype(url)
if proto is None:
raise ValueError("unknown URL type: %s" % url)
#get host
host, rest = splithost(rest)
#try to get port
host, port = splitport(host)
#if port is not defined try to get from proto
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError("unknown protocol for: %s" % url)
self._real_host = host
self._real_port = int(port)
M2Crypto.httpslib.HTTPSConnection.putrequest(self, method, url, skip_host, skip_accept_encoding)
def getHost(self):
return self._real_host
def getPort(self):
return self._real_port
def verify_certificate(connection):
ctx = connection.sock.ctx
verrs = ctx.verrs
ctx.verrs = None
cert = connection.sock.get_peer_cert()
if not cert:
connection.close()
raise SSLVerificationError("server did not present a certificate")
# XXX: should be check if the certificate is known anyways?
# Maybe it changed to something valid.
if not connection.sock.verify_ok():
tc = TrustedCertStore(connection.getHost(), connection.getPort(), connection.appname, cert)
if tc.is_known():
if tc.is_trusted(): # ok, same cert as the stored one
return
else:
print("WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", file=sys.stderr)
print("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", file=sys.stderr)
print("offending certificate is at '%s'" % tc.file, file=sys.stderr)
raise SSLVerificationError("remote host identification has changed")
# if http_debug is set we redirect sys.stdout to an StringIO
# instance in order to do some header filtering (see conf module)
# so we have to use the "original" stdout for printing
out = getattr(connection, '_orig_stdout', sys.stdout)
verrs.show(out)
print(file=out)
if not verrs.could_ignore():
raise SSLVerificationError("Certificate validation error cannot be ignored")
if not verrs.chain_ok:
print("A certificate in the chain failed verification", file=out)
if not verrs.cert_ok:
print("The server certificate failed verification", file=out)
while True:
print("""
Would you like to
0 - quit (default)
1 - continue anyways
2 - trust the server certificate permanently
9 - review the server certificate
""", file=out)
print("Enter choice [0129]: ", end='', file=out)
r = raw_input()
if not r or r == '0':
connection.close()
raise SSLVerificationError("Untrusted Certificate")
elif r == '1':
tc.trust_tmp()
return
elif r == '2':
tc.trust_always()
return
elif r == '9':
print(cert.as_text(), file=out)
# vim: sw=4 et