1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-26 22:56:15 +01:00

Switch http_request() to urllib3

Use connection pools for better performance.
Replace M2Crypto with cryptography and urllib3's ssl context.
This commit is contained in:
Daniel Mach 2022-02-03 11:11:40 +01:00
parent d8d4b0831c
commit 93bc0e4731
11 changed files with 907 additions and 998 deletions

View File

@ -30,8 +30,7 @@ jobs:
- 'opensuse/tumbleweed'
# CentOS Stream
# stream9 doesn't contain m2crypto required by osc
# - 'quay.io/centos/centos:stream9'
- 'quay.io/centos/centos:stream9'
# Debian
- 'debian:stable'
@ -50,7 +49,7 @@ jobs:
zypper --non-interactive --gpg-auto-import-keys refresh
zypper --non-interactive dist-upgrade
zypper --non-interactive install git-lfs
zypper --non-interactive install diffstat diffutils python3 python3-chardet python3-M2Crypto python3-pip python3-rpm python3-setuptools
zypper --non-interactive install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
- name: 'Install packages (Fedora/CentOS)'
if: ${{ startsWith(matrix.container, 'fedora:') || contains(matrix.container, 'centos:') }}
@ -58,7 +57,7 @@ jobs:
dnf -y makecache
dnf -y distro-sync
dnf -y install git-lfs
dnf -y install diffstat diffutils python3 python3-chardet python3-m2crypto python3-pip python3-rpm python3-setuptools
dnf -y install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
- name: 'Install packages (Debian/Ubuntu)'
if: ${{ startsWith(matrix.container, 'debian:') || startsWith(matrix.container, 'ubuntu:') }}
@ -66,7 +65,7 @@ jobs:
apt-get -y update
apt-get -y upgrade
apt-get -y --no-install-recommends install git-lfs
apt-get -y --no-install-recommends install diffstat diffutils python3 python3-chardet python3-m2crypto python3-pip python3-rpm python3-setuptools
apt-get -y --no-install-recommends install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
- uses: actions/checkout@v3

View File

@ -8,26 +8,18 @@ from __future__ import print_function
import errno
import os.path
import pdb
import ssl
import sys
import signal
import traceback
from osc import oscerr
from .oscsslexcp import NoSecureSSLError
from .oscssl import CertVerificationError
from osc.util.cpio import CpioError
from osc.util.packagequery import PackageError
from osc.util.helper import decode_it
from osc.OscConfigParser import configparser
try:
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto.SSL import SSLError as SSLError
except:
class SSLError(Exception):
pass
class SSLVerificationError(Exception):
pass
try:
# import as RPMError because the class "error" is too generic
from rpm import error as RPMError
@ -36,13 +28,9 @@ except:
class RPMError(Exception):
pass
try:
from http.client import HTTPException, BadStatusLine
from urllib.error import URLError, HTTPError
except ImportError:
#python 2.x
from httplib import HTTPException, BadStatusLine
from urllib2 import URLError, HTTPError
import urllib3.exceptions
from http.client import HTTPException, BadStatusLine
from urllib.error import URLError, HTTPError
# the good things are stolen from Matt Mackall's mercurial
@ -142,6 +130,10 @@ def run(prg, argv=None):
msg += ' (%s)' % e._osc_host_port
msg += ':\n'
print(msg, e.reason, file=sys.stderr)
except ssl.SSLError as e:
if 'tlsv1' in str(e):
print('The python on this system or the server does not support TLSv1.2', file=sys.stderr)
print("SSL Error:", e, file=sys.stderr)
except IOError as e:
# ignore broken pipe
if e.errno != errno.EPIPE:
@ -182,14 +174,10 @@ def run(prg, argv=None):
print('%s:' % e.fname, e.msg, file=sys.stderr)
except RPMError as e:
print(e, file=sys.stderr)
except SSLError as e:
if 'tlsv1' in str(e):
print('The python on this system does not support TLSv1.2', file=sys.stderr)
print("SSL Error:", e, file=sys.stderr)
except SSLVerificationError as e:
print("Certificate Verification Error:", e, file=sys.stderr)
except NoSecureSSLError as e:
except CertVerificationError as e:
print(e, file=sys.stderr)
except urllib3.exceptions.MaxRetryError as e:
print(e.reason, file=sys.stderr)
except CpioError as e:
print(e, file=sys.stderr)
except oscerr.OscBaseError as e:

View File

@ -35,7 +35,8 @@ except ImportError:
# for 3.9+
from xml.etree import ElementTree as ET
from .conf import config, cookiejar
from . import connection
from .conf import config
from .meter import create_text_meter
@ -1092,7 +1093,7 @@ def main(apiurl, opts, argv):
http_debug = config['http_debug'],
modules = bi.modules,
enable_cpio=not opts.disable_cpio_bulk_download and bi.enable_cpio,
cookiejar=cookiejar,
cookiejar=CookieJarAuthHandler(os.path.expanduser(conf.config["cookiejar"]))._cookiejar,
download_api_only=opts.download_api_only)
if not opts.trust_all_projects:

View File

@ -37,39 +37,19 @@ The configuration dictionary could look like this:
"""
import bz2
import base64
import errno
import os
import re
import sys
import ssl
import getpass
import time
import subprocess
try:
from http.cookiejar import LWPCookieJar, CookieJar
from http.client import HTTPConnection, HTTPResponse
from io import StringIO
from urllib.parse import urlsplit
from urllib.error import URLError
from urllib.request import HTTPBasicAuthHandler, HTTPCookieProcessor, HTTPPasswordMgrWithDefaultRealm, ProxyHandler
from urllib.request import AbstractHTTPHandler, build_opener, proxy_bypass, HTTPSHandler
from urllib.request import BaseHandler, parse_keqv_list, parse_http_list
except ImportError:
#python 2.x
from cookielib import LWPCookieJar, CookieJar
from httplib import HTTPConnection, HTTPResponse
from StringIO import StringIO
from urlparse import urlsplit
from urllib2 import URLError, HTTPBasicAuthHandler, HTTPCookieProcessor, HTTPPasswordMgrWithDefaultRealm, ProxyHandler
from urllib2 import AbstractHTTPHandler, build_opener, proxy_bypass, HTTPSHandler
from urllib2 import BaseHandler, parse_keqv_list, parse_http_list
from io import StringIO
from urllib.error import URLError
from urllib.parse import urlsplit
from . import OscConfigParser
from osc import oscerr
from osc.util.helper import raw_input, decode_it
from .oscsslexcp import NoSecureSSLError
from osc.util.helper import raw_input
from osc import credentials
GENERIC_KEYRING = False
@ -428,8 +408,6 @@ the apiurl \'%s\' does not exist in the config file. Please enter
your credentials for this apiurl.
"""
cookiejar = None
def parse_apisrv_url(scheme, apisrv):
if apisrv.startswith('http://') or apisrv.startswith('https://'):
@ -502,334 +480,6 @@ def get_apiurl_usr(apiurl):
return config['user']
# workaround m2crypto issue:
# if multiple SSL.Context objects are created
# m2crypto only uses the last object which was created.
# So we need to build a new opener everytime we switch the
# apiurl (because different apiurls may have different
# cafile/capath locations)
def _build_opener(apiurl):
from osc.core import __version__
global config
class OscHTTPAuthHandler(HTTPBasicAuthHandler, object):
# python2: inherit from object in order to make it a new-style class
# (HTTPBasicAuthHandler is not a new-style class)
def __init__(self, password_mgr=None, signatureauthhandler=None):
super(self.__class__, self).__init__(password_mgr)
self.signatureauthhandler = signatureauthhandler
def add_parent(self, parent):
super(self.__class__, self).add_parent(parent)
if self.signatureauthhandler:
self.signatureauthhandler.add_parent(parent)
def _rewind_request(self, req):
if hasattr(req.data, 'seek'):
# if the request is issued again (this time with an
# Authorization header), the file's offset has to be
# repositioned to the beginning of the file (otherwise,
# a 0-length body is sent which most likely does not match
# the Content-Length header (if present))
req.data.seek(0)
def http_error_401(self, req, fp, code, msg, headers):
self._rewind_request(req)
authreqs = {}
for authreq in headers.get_all('www-authenticate', []):
scheme = authreq.split()[0].lower()
authreqs[scheme] = authreq
if 'signature' in authreqs \
and self.signatureauthhandler \
and (
# sshkey explicitly set in the config file, use it instead of doing basic auth
self.signatureauthhandler.sshkey_known()
or (
# can't fall-back to basic auth, because server doesn't support it
'basic' not in authreqs
# can't fall-back to basic auth, because there's no password provided
or not self.passwd.find_user_password(None, apiurl)[1]
)):
del headers['www-authenticate']
headers['www-authenticate'] = authreqs['signature']
return self.signatureauthhandler.http_error_401(req, fp, code, msg, headers)
if 'basic' in authreqs:
del headers['www-authenticate']
headers['www-authenticate'] = authreqs['basic']
response = super(self.__class__, self).http_error_401(req, fp, code, msg, headers)
# workaround for http://bugs.python.org/issue9639
if hasattr(self, 'retried'):
self.retried = 0
return response
class OscHTTPSignatureAuthHandler(BaseHandler, object):
def __init__(self, user, sshkey):
super(self.__class__, self).__init__()
self.user = user
self.sshkey = sshkey
def list_ssh_agent_keys(self):
cmd = ['ssh-add', '-l']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
if proc.returncode == 0 and stdout.strip():
return [self.get_fingerprint(line) for line in stdout.splitlines()]
else:
return []
def is_ssh_private_keyfile(self, keyfile_path):
if not os.path.isfile(keyfile_path):
return False
with open(keyfile_path, "r") as f:
try:
line = f.readline(100).strip()
except UnicodeDecodeError:
# skip binary files
return False
if line == "-----BEGIN RSA PRIVATE KEY-----":
return True
if line == "-----BEGIN OPENSSH PRIVATE KEY-----":
return True
return False
def is_ssh_public_keyfile(self, keyfile_path):
if not os.path.isfile(keyfile_path):
return False
return keyfile_path.endswith(".pub")
@staticmethod
def get_fingerprint(line):
parts = line.strip().split(b" ")
if len(parts) < 2:
raise ValueError("Unable to retrieve ssh key fingerprint from line: {}".format(line))
return parts[1]
def list_ssh_dir_keys(self):
sshdir = os.path.expanduser('~/.ssh')
keys_in_home_ssh = {}
for keyfile in os.listdir(sshdir):
if keyfile.startswith(("agent-", "authorized_keys", "config", "known_hosts")):
# skip files that definitely don't contain keys
continue
keyfile_path = os.path.join(sshdir, keyfile)
# public key alone may be sufficient because the private key
# can get loaded into ssh-agent from gpg (yubikey works this way)
is_public = self.is_ssh_public_keyfile(keyfile_path)
# skip private detection if we think the key is a public one already
is_private = False if is_public else self.is_ssh_private_keyfile(keyfile_path)
if not is_public and not is_private:
continue
cmd = ["ssh-keygen", "-lf", keyfile_path]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
if proc.returncode == 0:
fingerprint = self.get_fingerprint(stdout)
if fingerprint and (fingerprint not in keys_in_home_ssh or is_private):
# prefer path to a private key
keys_in_home_ssh[fingerprint] = keyfile_path
return keys_in_home_ssh
def guess_keyfile(self):
keys_in_agent = self.list_ssh_agent_keys()
if keys_in_agent:
keys_in_home_ssh = self.list_ssh_dir_keys()
for fingerprint in keys_in_agent:
if fingerprint in keys_in_home_ssh:
return keys_in_home_ssh[fingerprint]
sshdir = os.path.expanduser('~/.ssh')
keyfiles = ('id_ed25519', 'id_ed25519_sk', 'id_rsa', 'id_ecdsa', 'id_ecdsa_sk', 'id_dsa')
for keyfile in keyfiles:
keyfile_path = os.path.join(sshdir, keyfile)
if os.path.isfile(keyfile_path):
return keyfile_path
raise oscerr.OscIOError(None, 'could not guess ssh identity keyfile')
def ssh_sign(self, data, namespace, keyfile=None):
try:
data = bytes(data, 'utf-8')
except:
pass
if not keyfile:
keyfile = self.guess_keyfile()
else:
if '/' not in keyfile:
keyfile = '~/.ssh/' + keyfile
keyfile = os.path.expanduser(keyfile)
cmd = ['ssh-keygen', '-Y', 'sign', '-f', keyfile, '-n', namespace, '-q']
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, _ = proc.communicate(data)
if proc.returncode:
raise oscerr.OscIOError(None, 'ssh-keygen signature creation failed: %d' % proc.returncode)
signature = decode_it(stdout)
match = re.match(r"\A-----BEGIN SSH SIGNATURE-----\n(.*)\n-----END SSH SIGNATURE-----", signature, re.S)
if not match:
raise oscerr.OscIOError(None, 'could not extract ssh signature')
return base64.b64decode(match.group(1))
def get_authorization(self, req, chal):
realm = chal.get('realm', '')
now = int(time.time())
sigdata = "(created): %d" % now
signature = self.ssh_sign(sigdata, realm, self.sshkey)
signature = decode_it(base64.b64encode(signature))
return 'keyId="%s",algorithm="ssh",headers="(created)",created=%d,signature="%s"' \
% (self.user, now, signature)
def retry_http_signature_auth(self, req, auth):
old_auth_val = req.get_header('Authorization', None)
if old_auth_val:
old_scheme = old_auth_val.split()[0]
if old_scheme.lower() == 'signature':
return None
token, challenge = auth.split(' ', 1)
chal = parse_keqv_list(filter(None, parse_http_list(challenge)))
auth = self.get_authorization(req, chal)
if auth:
auth_val = 'Signature %s' % auth
req.add_unredirected_header('Authorization', auth_val)
return self.parent.open(req, timeout=req.timeout)
def http_error_401(self, req, fp, code, msg, headers):
authreq = headers.get('www-authenticate', None)
if authreq:
scheme = authreq.split()[0]
if scheme.lower() == 'signature':
return self.retry_http_signature_auth(req, authreq)
raise ValueError("OscHTTPSignatureAuthHandler does not support"
" the following scheme: '%s'" % scheme)
def sshkey_known(self):
return self.sshkey is not None
if 'last_opener' not in _build_opener.__dict__:
_build_opener.last_opener = (None, None)
if apiurl == _build_opener.last_opener[0]:
return _build_opener.last_opener[1]
# respect no_proxy env variable
if proxy_bypass(apiurl):
# initialize with empty dict
proxyhandler = ProxyHandler({})
else:
# read proxies from env
proxyhandler = ProxyHandler()
options = config['api_host_options'][apiurl]
signatureauthhandler = OscHTTPSignatureAuthHandler(options['user'], options['sshkey'])
# with None as first argument, it will always use this username/password
# combination for urls for which arg2 (apisrv) is a super-url
authhandler = OscHTTPAuthHandler(HTTPPasswordMgrWithDefaultRealm(), signatureauthhandler)
authhandler.add_password(None, apiurl, options['user'], options['pass'])
if options['sslcertck']:
try:
from . import oscssl
from M2Crypto import m2urllib2
except ImportError as e:
print(e)
raise NoSecureSSLError('M2Crypto is needed to access %s in a secure way.\nPlease install python-m2crypto.' % apiurl)
cafile = options.get('cafile', None)
capath = options.get('capath', None)
if not cafile and not capath:
for i in ['/etc/pki/tls/cert.pem', '/etc/ssl/certs']:
if os.path.isfile(i):
cafile = i
break
elif os.path.isdir(i):
capath = i
break
if not cafile and not capath:
raise oscerr.OscIOError(None, 'No CA certificates found. (You may want to install ca-certificates-mozilla package)')
ctx = oscssl.mySSLContext()
if ctx.load_verify_locations(capath=capath, cafile=cafile) != 1:
raise oscerr.OscIOError(None, 'No CA certificates found. (You may want to install ca-certificates-mozilla package)')
opener = m2urllib2.build_opener(ctx, oscssl.myHTTPSHandler(ssl_context=ctx, appname='osc'), HTTPCookieProcessor(cookiejar), authhandler, proxyhandler)
else:
handlers = [HTTPCookieProcessor(cookiejar), authhandler, proxyhandler]
try:
# disable ssl cert check in python >= 2.7.9
ctx = ssl._create_unverified_context()
handlers.append(HTTPSHandler(context=ctx))
except AttributeError:
pass
print("WARNING: SSL certificate checks disabled. Connection is insecure!\n", file=sys.stderr)
opener = build_opener(*handlers)
opener.addheaders = [('User-agent', 'osc/%s' % __version__)]
_build_opener.last_opener = (apiurl, opener)
return opener
def init_basicauth(config, config_mtime):
"""initialize urllib2 with the credentials for Basic Authentication"""
def filterhdrs(meth, ishdr, *hdrs):
# this is so ugly but httplib doesn't use
# a logger object or such
def new_method(self, *args, **kwargs):
# check if this is a recursive call (note: we do not
# have to care about thread safety)
is_rec_call = getattr(self, '_orig_stdout', None) is not None
try:
if not is_rec_call:
self._orig_stdout = sys.stdout
sys.stdout = StringIO()
meth(self, *args, **kwargs)
hdr = sys.stdout.getvalue()
finally:
# restore original stdout
if not is_rec_call:
sys.stdout = self._orig_stdout
del self._orig_stdout
for i in hdrs:
if ishdr:
hdr = re.sub(r'%s:[^\\r]*\\r\\n' % i, '', hdr)
else:
hdr = re.sub(i, '', hdr)
sys.stdout.write(hdr)
new_method.__name__ = meth.__name__
return new_method
if config['http_debug'] and not config['http_full_debug']:
HTTPConnection.send = filterhdrs(HTTPConnection.send, True, 'Cookie', 'Authorization')
HTTPResponse.begin = filterhdrs(HTTPResponse.begin, False, 'header: Set-Cookie.*\n')
if config['http_debug']:
# brute force
def urllib2_debug_init(self, debuglevel=0):
self._debuglevel = 1
AbstractHTTPHandler.__init__ = urllib2_debug_init
cookie_file = os.path.expanduser(config['cookiejar'])
if not os.path.exists(os.path.dirname(cookie_file)):
os.makedirs(os.path.dirname(cookie_file), mode=0o700)
global cookiejar
cookiejar = LWPCookieJar(cookie_file)
try:
cookiejar.load(ignore_discard=True)
if int(round(config_mtime)) > int(os.stat(cookie_file).st_mtime):
cookiejar.clear()
cookiejar.save()
except IOError:
try:
fd = os.open(cookie_file, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o600)
os.close(fd)
except IOError:
# hmm is any good reason why we should catch the IOError?
#print 'Unable to create cookiejar file: \'%s\'. Using RAM-based cookies.' % cookie_file
cookiejar = CookieJar()
def get_configParser(conffile=None, force_read=False):
"""
Returns an ConfigParser() object. After its first invocation the
@ -1246,8 +896,10 @@ def get_config(override_conffile=None,
e.file = conffile
raise e
# finally, initialize urllib2 for to use the credentials for Basic Authentication
init_basicauth(config, os.stat(conffile).st_mtime)
# enable connection debugging after all config options are set
from .connection import enable_http_debug
enable_http_debug(config)
def identify_conf():
# needed for compat reasons(users may have their oscrc still in ~

613
osc/connection.py Normal file
View File

@ -0,0 +1,613 @@
import base64
import errno
import os
import re
import subprocess
import ssl
import sys
import time
import http.client
import http.cookiejar
import urllib.parse
import urllib.request
import urllib3.exceptions
import urllib3.poolmanager
import urllib3.response
import urllib3.util
from . import conf
from . import oscerr
from . import oscssl
from .util.helper import decode_it
class MockRequest:
"""
Mock a request object for `cookiejar.extract_cookies()`
and `cookiejar.add_cookie_header()`.
"""
def __init__(self, url, headers):
self.url = url
self.headers = headers
self.unverifiable = False
self.type = "https"
def get_full_url(self):
return self.url
def get_header(self, header_name, default=None):
return self.headers.get(header_name, default)
def has_header(self, header_name):
return (header_name in self.headers)
def add_unredirected_header(self, key, val):
# modifies the `headers` variable that was passed to object's constructor
self.headers[key] = val
def enable_http_debug(config):
if not int(config["http_debug"]) and not int(config["http_full_debug"]):
return
# HACK: override HTTPResponse's init to increase debug level
old_HTTPResponse__init__ = http.client.HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
old_HTTPResponse__init__(self, *args, **kwargs)
self.debuglevel = 1
http.client.HTTPResponse.__init__ = new_HTTPResponse__init__
# increase HTTPConnection debug level
http.client.HTTPConnection.debuglevel = 1
# HACK: because HTTPResponse's debug data uses print(),
# let's inject custom print() function to that module
def new_print(*args, file=None):
if not int(config["http_full_debug"]) and args:
# hide private data (authorization and cookies) when full debug is not enabled
if args[:2] == ("header:", "Set-Cookie:"):
return
if args[0] == "send:":
args = list(args)
# (?<=...) - '...' must be present before the pattern (positive lookbehind assertion)
args[1] = re.sub(r"(?<=\\r\\n)authorization:.*?\\r\\n", "", args[1], re.I)
args[1] = re.sub(r"(?<=\\r\\n)Cookie:.*?\\r\\n", "", args[1], re.I)
print("DEBUG:", *args, file=sys.stderr)
http.client.print = new_print
def get_proxy_manager(env):
proxy_url = os.environ.get(env, None)
if not proxy_url:
return
proxy_purl = urllib3.util.parse_url(proxy_url)
# rebuild proxy url in order to remove auth because ProxyManager would fail on it
if proxy_purl.port:
proxy_url = f"{proxy_purl.scheme}://{proxy_purl.host}:{proxy_purl.port}"
else:
proxy_url = f"{proxy_purl.scheme}://{proxy_purl.host}"
# import osc.core here to avoid cyclic imports
from . import core
proxy_headers = urllib3.make_headers(
proxy_basic_auth=proxy_purl.auth,
user_agent=f"osc/{core.__version__}",
)
manager = urllib3.ProxyManager(proxy_url, proxy_headers=proxy_headers)
return manager
# Instantiate on first use in `http_request()`.
# Each `apiurl` requires a differently configured pool
# (incl. trusted keys for example).
CONNECTION_POOLS = {}
# Pool manager for requests outside apiurls.
POOL_MANAGER = urllib3.PoolManager()
# Proxy manager for HTTP connections.
HTTP_PROXY_MANAGER = get_proxy_manager("HTTP_PROXY")
# Proxy manager for HTTPS connections.
HTTPS_PROXY_MANAGER = get_proxy_manager("HTTPS_PROXY")
def http_request(method, url, headers=None, data=None, file=None):
"""
Send a HTTP request to a server.
Features:
* Authentication ([apiurl]/{user,pass} in oscrc)
* Session cookie support (~/.local/state/osc/cookiejar)
* SSL certificate verification incl. managing trusted certs
* SSL certificate verification bypass (if [apiurl]/sslcertck=0 in oscrc)
* Expired SSL certificates are no longer accepted. Either prolong them or set sslcertck=0.
* Proxy support (HTTPS_PROXY env, NO_PROXY is respected)
* Retries (http_retries in oscrc)
* Requests outside apiurl (incl. proxy support)
* Connection debugging (-H/--http-debug, --http-full-debug)
:param method: HTTP request method (such as GET, POST, PUT, DELETE).
:param url: The URL to perform the request on.
:param headers: Dictionary of custom headers to send.
:param data: Data to send in the request body (conflicts with `file`).
:param file: Path to a file to send as data in the request body (conflicts with `data`).
"""
# import osc.core here to avoid cyclic imports
from . import core
purl = urllib3.util.parse_url(url)
apiurl = conf.extract_known_apiurl(url)
headers = urllib3.response.HTTPHeaderDict(headers or {})
# identify osc
headers.update(urllib3.make_headers(user_agent=f"osc/{core.__version__}"))
if data and file:
raise RuntimeError('Specify either `data` or `file`')
elif data:
if hasattr(data, "encode"):
data = data.encode("utf-8")
content_length = len(data)
elif file:
content_length = os.path.getsize(file)
data = open(file, "rb")
else:
content_length = 0
if content_length:
headers.add("Content-Length", str(content_length))
# handle requests that go outside apiurl
# do not set auth cookie or auth credentials
if not apiurl:
if purl.scheme == "http" and HTTP_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
# connection through proxy
manager = HTTP_PROXY_MANAGER
elif purl.scheme == "https" and HTTPS_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
# connection through proxy
manager = HTTPS_PROXY_MANAGER
else:
# direct connection
manager = POOL_MANAGER
response = manager.urlopen(method, url, body=data, headers=headers, preload_content=False)
if response.status / 100 != 2:
raise urllib.error.HTTPError(url, response.status, response.reason, response.headers, response)
return response
options = conf.config["api_host_options"][apiurl]
global CONNECTION_POOLS
pool = CONNECTION_POOLS.get(apiurl, None)
if not pool:
pool_kwargs = {}
pool_kwargs["retries"] = int(conf.config["http_retries"])
if purl.scheme == "https":
ssl_context = oscssl.create_ssl_context()
ssl_context.load_default_certs()
# turn cert verification off if sslcertck = 0
pool_kwargs["cert_reqs"] = "CERT_REQUIRED" if options["sslcertck"] else "CERT_NONE"
pool_kwargs["ssl_context"] = ssl_context
if purl.scheme == "http" and HTTP_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
# connection through HTTP proxy
pool = HTTP_PROXY_MANAGER.connection_from_host(
host=purl.host,
port=purl.port,
scheme=purl.scheme,
pool_kwargs=pool_kwargs
)
HTTP_PROXY_MANAGER.request('GET', url)
elif purl.scheme == "https" and HTTPS_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
# connection through HTTPS proxy
pool = HTTPS_PROXY_MANAGER.connection_from_host(
host=purl.host,
port=purl.port,
scheme=purl.scheme,
pool_kwargs=pool_kwargs
)
elif purl.scheme == "https":
# direct connection
pool = urllib3.HTTPSConnectionPool(host=purl.host, port=purl.port, **pool_kwargs)
else:
pool = urllib3.HTTPConnectionPool(host=purl.host, port=purl.port, **pool_kwargs)
if purl.scheme == "https":
# inject ssl context instance into pool so we can use it later
pool.ssl_context = ssl_context
# inject trusted cert store instance into pool so we can use it later
pool.trusted_cert_store = oscssl.TrustedCertStore(ssl_context, purl.host, purl.port)
CONNECTION_POOLS[apiurl] = pool
auth_handlers = [
CookieJarAuthHandler(os.path.expanduser(conf.config["cookiejar"])),
SignatureAuthHandler(options["user"], options["sshkey"], options["pass"]),
BasicAuthHandler(options["user"], options["pass"]),
]
for handler in auth_handlers:
# authenticate using a cookie (if available)
success = handler.set_request_headers(url, headers)
if success:
break
if data or file:
# osc/obs data is usually XML
headers.add("Content-Type", "application/xml; charset=utf-8")
if purl.scheme == "http" and HTTP_PROXY_MANAGER:
# HTTP proxy requires full URL with 'same host' checking off
urlopen_url = url
assert_same_host = False
else:
# everything else is fine with path only
# join path and query, ignore the remaining args; args are (scheme, netloc, path, query, fragment)
urlopen_url = urllib.parse.urlunsplit(("", "", purl.path, purl.query, ""))
assert_same_host = True
if int(conf.config['http_debug']):
# use the hacked print() for consistency
http.client.print(40 * '-')
http.client.print(method, url)
try:
response = pool.urlopen(
method, urlopen_url, body=data, headers=headers,
preload_content=False, assert_same_host=assert_same_host
)
except urllib3.exceptions.MaxRetryError as e:
if not isinstance(e.reason, urllib3.exceptions.SSLError):
# re-raise exceptions that are not related to SSL
raise
if isinstance(e.reason.args[0], ssl.SSLCertVerificationError):
self_signed_verify_codes = (
oscssl.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
oscssl.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
)
if e.reason.args[0].verify_code not in self_signed_verify_codes:
# re-raise ssl exceptions that are not related to self-signed certs
raise e.reason.args[0] from None
else:
# re-raise other than ssl exceptions
raise e.reason.args[0] from None
# get the untrusted certificated from server
cert = pool.trusted_cert_store.get_server_certificate()
# prompt user if we should trust the certificate
pool.trusted_cert_store.prompt_trust(cert, reason=e.reason)
response = pool.urlopen(
method, urlopen_url, body=data, headers=headers,
preload_content=False, assert_same_host=assert_same_host
)
if response.status == 401:
# session cookie has expired, re-authenticate
for handler in auth_handlers:
success = handler.set_request_headers_after_401(url, headers, response)
if success:
break
response = pool.urlopen(method, urlopen_url, body=data, headers=headers, preload_content=False)
if response.status / 100 != 2:
raise urllib.error.HTTPError(url, response.status, response.reason, response.headers, response)
for handler in auth_handlers:
handler.process_response(url, headers, response)
return response
# pylint: disable=C0103,C0116
def http_GET(*args, **kwargs):
return http_request("GET", *args, **kwargs)
# pylint: disable=C0103,C0116
def http_POST(*args, **kwargs):
return http_request("POST", *args, **kwargs)
# pylint: disable=C0103,C0116
def http_PUT(*args, **kwargs):
return http_request("PUT", *args, **kwargs)
# pylint: disable=C0103,C0116
def http_DELETE(*args, **kwargs):
return http_request("DELETE", *args, **kwargs)
class AuthHandlerBase:
def _get_auth_schemes(self, response):
"""
Extract all `www-authenticate` headers from `response` and return them
in a dictionary: `{scheme: auth_method}`.
"""
result = {}
for auth_method in response.headers.get_all("www-authenticate", []):
scheme = auth_method.split()[0].lower()
result[scheme] = auth_method
return result
def set_request_headers(self, url, request_headers):
"""
Modify request headers with auth headers.
:param url: Request URL provides context for `request_headers` modifications
:type url: str
:param request_headers: object to be modified
:type request_headers: urllib3.response.HTTPHeaderDict
:return: `True` on if `request_headers` was modified, `False` otherwise
"""
raise NotImplementedError
def set_request_headers_after_401(self, url, request_headers, response):
"""
Modify request headers with auth headers after getting 401 response.
:param url: Request URL provides context for `request_headers` modifications
:type url: str
:param request_headers: object to be modified
:type request_headers: urllib3.response.HTTPHeaderDict
:param response: Response object provides context for `request_headers` modifications
:type response: urllib3.response.HTTPResponse
:return: `True` on if `request_headers` was modified, `False` otherwise
"""
raise NotImplementedError
def process_response(self, url, request_headers, response):
"""
Retrieve data from response, save cookies, etc.
:param url: Request URL provides context for `request_headers` modifications
:type url: str
:param request_headers: object to be modified
:type request_headers: urllib3.response.HTTPHeaderDict
:param response: Response object provides context for `request_headers` modifications
:type response: urllib3.response.HTTPResponse
"""
raise NotImplementedError
class CookieJarAuthHandler(AuthHandlerBase):
# Shared among instances, instantiate on first use, key equals too cookiejar path.
COOKIEJARS = {}
def __init__(self, cookiejar_path):
self.cookiejar_path = cookiejar_path
@property
def _cookiejar(self):
jar = self.COOKIEJARS.get(self.cookiejar_path, None)
if not jar:
try:
os.makedirs(os.path.dirname(self.cookiejar_path), mode=0o700)
except OSError as e:
if e.errno != errno.EEXIST:
raise
jar = http.cookiejar.LWPCookieJar(self.cookiejar_path)
if os.path.isfile(self.cookiejar_path):
jar.load()
self.COOKIEJARS[self.cookiejar_path] = jar
return jar
def set_request_headers(self, url, request_headers):
self._cookiejar.add_cookie_header(MockRequest(url, request_headers))
return bool(request_headers.get_all("cookie", None))
def set_request_headers_after_401(self, url, request_headers, response):
# can't do anything, we have tried setting a cookie already
return False
def process_response(self, url, request_headers, response):
self._cookiejar.extract_cookies(response, MockRequest(url, response.headers))
self._cookiejar.save()
class BasicAuthHandler(AuthHandlerBase):
def __init__(self, user, password):
self.user = user
self.password = password
def set_request_headers(self, url, request_headers):
return False
def set_request_headers_after_401(self, url, request_headers, response):
auth_schemes = self._get_auth_schemes(response)
if "basic" not in auth_schemes:
return False
if not self.user or not self.password:
return False
request_headers.update(urllib3.make_headers(basic_auth=f"{self.user}:{self.password}"))
return True
def process_response(self, url, request_headers, response):
pass
class SignatureAuthHandler(AuthHandlerBase):
def __init__(self, user, sshkey, basic_auth_password=None):
self.user = user
self.sshkey = sshkey
# value of `basic_auth_password` is only used as a hint if we should skip signature auth
self.basic_auth_password = bool(basic_auth_password)
def list_ssh_agent_keys(self):
cmd = ['ssh-add', '-l']
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
# ssh-add is not available
return []
stdout, _ = proc.communicate()
if proc.returncode == 0 and stdout.strip():
return [self.get_fingerprint(line) for line in stdout.splitlines()]
else:
return []
def is_ssh_private_keyfile(self, keyfile_path):
if not os.path.isfile(keyfile_path):
return False
with open(keyfile_path, "r") as f:
try:
line = f.readline(100).strip()
except UnicodeDecodeError:
# skip binary files
return False
if line == "-----BEGIN RSA PRIVATE KEY-----":
return True
if line == "-----BEGIN OPENSSH PRIVATE KEY-----":
return True
return False
def is_ssh_public_keyfile(self, keyfile_path):
if not os.path.isfile(keyfile_path):
return False
return keyfile_path.endswith(".pub")
@staticmethod
def get_fingerprint(line):
parts = line.strip().split(b" ")
if len(parts) < 2:
raise ValueError("Unable to retrieve ssh key fingerprint from line: {}".format(line))
return parts[1]
def list_ssh_dir_keys(self):
sshdir = os.path.expanduser('~/.ssh')
keys_in_home_ssh = {}
for keyfile in os.listdir(sshdir):
if keyfile.startswith(("agent-", "authorized_keys", "config", "known_hosts")):
# skip files that definitely don't contain keys
continue
keyfile_path = os.path.join(sshdir, keyfile)
# public key alone may be sufficient because the private key
# can get loaded into ssh-agent from gpg (yubikey works this way)
is_public = self.is_ssh_public_keyfile(keyfile_path)
# skip private detection if we think the key is a public one already
is_private = False if is_public else self.is_ssh_private_keyfile(keyfile_path)
if not is_public and not is_private:
continue
cmd = ["ssh-keygen", "-lf", keyfile_path]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
if proc.returncode == 0:
fingerprint = self.get_fingerprint(stdout)
if fingerprint and (fingerprint not in keys_in_home_ssh or is_private):
# prefer path to a private key
keys_in_home_ssh[fingerprint] = keyfile_path
return keys_in_home_ssh
def guess_keyfile(self):
keys_in_agent = self.list_ssh_agent_keys()
if keys_in_agent:
keys_in_home_ssh = self.list_ssh_dir_keys()
for fingerprint in keys_in_agent:
if fingerprint in keys_in_home_ssh:
return keys_in_home_ssh[fingerprint]
sshdir = os.path.expanduser('~/.ssh')
keyfiles = ('id_ed25519', 'id_ed25519_sk', 'id_rsa', 'id_ecdsa', 'id_ecdsa_sk', 'id_dsa')
for keyfile in keyfiles:
keyfile_path = os.path.join(sshdir, keyfile)
if os.path.isfile(keyfile_path):
return keyfile_path
return None
def ssh_sign(self, data, namespace, keyfile=None):
try:
data = bytes(data, 'utf-8')
except:
pass
if not keyfile:
keyfile = self.guess_keyfile()
else:
if '/' not in keyfile:
keyfile = '~/.ssh/' + keyfile
keyfile = os.path.expanduser(keyfile)
cmd = ['ssh-keygen', '-Y', 'sign', '-f', keyfile, '-n', namespace, '-q']
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, _ = proc.communicate(data)
if proc.returncode:
raise oscerr.OscIOError(None, 'ssh-keygen signature creation failed: %d' % proc.returncode)
signature = decode_it(stdout)
match = re.match(r"\A-----BEGIN SSH SIGNATURE-----\n(.*)\n-----END SSH SIGNATURE-----", signature, re.S)
if not match:
raise oscerr.OscIOError(None, 'could not extract ssh signature')
return base64.b64decode(match.group(1))
def get_authorization(self, chal):
realm = chal.get('realm', '')
now = int(time.time())
sigdata = "(created): %d" % now
signature = self.ssh_sign(sigdata, realm, self.sshkey)
signature = decode_it(base64.b64encode(signature))
return 'keyId="%s",algorithm="ssh",headers="(created)",created=%d,signature="%s"' \
% (self.user, now, signature)
def add_signature_auth_header(self, req, auth):
token, challenge = auth.split(' ', 1)
chal = urllib.request.parse_keqv_list(filter(None, urllib.request.parse_http_list(challenge)))
auth = self.get_authorization(chal)
if not auth:
return False
auth_val = 'Signature %s' % auth
req.add('Authorization', auth_val)
return True
def set_request_headers(self, url, request_headers):
return False
def set_request_headers_after_401(self, url, request_headers, response):
auth_schemes = self._get_auth_schemes(response)
if "signature" not in auth_schemes:
# unsupported on server
return False
if not self.user:
return False
if self.basic_auth_password and "basic" in auth_schemes:
# prefer basic auth, but only if password is set
return False
if not self.sshkey_known():
# ssh key not set, try to guess it
self.sshkey = self.guess_keyfile()
if not self.sshkey_known():
# ssh key cannot be guessed
return False
return self.add_signature_auth_header(request_headers, auth_schemes["signature"])
def process_response(self, url, request_headers, response):
pass
def sshkey_known(self):
return self.sshkey is not None

View File

@ -34,21 +34,11 @@ try:
except ImportError:
distro = None
try:
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
from urllib.error import HTTPError, URLError
from urllib.request import pathname2url, install_opener, urlopen
from urllib.request import Request as URLRequest
from io import StringIO
from http.client import IncompleteRead
except ImportError:
#python 2.x
from urlparse import urlsplit, urlunsplit, urlparse
from urllib import pathname2url, quote_plus, urlencode, unquote
from urllib2 import HTTPError, URLError, install_opener, urlopen
from urllib2 import Request as URLRequest
from cStringIO import StringIO
from httplib import IncompleteRead
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
from urllib.error import HTTPError
from urllib.request import pathname2url
from io import StringIO
from http.client import IncompleteRead
try:
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
@ -60,6 +50,7 @@ except ImportError:
from . import oscerr
from . import conf
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
try:
from functools import cmp_to_key
@ -3382,87 +3373,6 @@ def makeurl(baseurl, l, query=[]):
return urlunsplit((scheme, netloc, '/'.join([path] + list(l)), query, ''))
def http_request(method, url, headers={}, data=None, file=None):
"""wrapper around urllib2.urlopen for error handling,
and to support additional (PUT, DELETE) methods"""
class DataContext:
"""Wrap a data value (or None) in a context manager."""
def __init__(self, data):
self._data = data
def __enter__(self):
return self._data
def __exit__(self, exc_type, exc_val, exc_tb):
return None
if file is not None and data is not None:
raise RuntimeError('file and data are mutually exclusive')
if conf.config['http_debug']:
print('\n\n--', method, url, file=sys.stderr)
if method == 'POST' and not file and not data:
# adding data to an urllib2 request transforms it into a POST
data = b''
req = URLRequest(url)
api_host_options = {}
apiurl = conf.extract_known_apiurl(url)
if apiurl is not None:
# ok no external request
install_opener(conf._build_opener(apiurl))
api_host_options = conf.get_apiurl_api_host_options(apiurl)
for header, value in api_host_options['http_headers']:
req.add_header(header, value)
req.get_method = lambda: method
# POST requests are application/x-www-form-urlencoded per default
# but sending data requires an octet-stream type
if method == 'PUT' or (method == 'POST' and (data or file)):
req.add_header('Content-Type', 'application/octet-stream')
if isinstance(headers, type({})):
for i in headers.keys():
print(headers[i])
req.add_header(i, headers[i])
if conf.config['debug']: print(method, url, file=sys.stderr)
content_length = None
if data is not None:
if isinstance(data, str):
data = data.encode('utf-8')
content_length = len(data)
elif file is not None:
content_length = os.path.getsize(file)
with (open(file, 'rb') if file is not None else DataContext(data)) as d:
req.data = d
if content_length is not None:
# do this after setting req.data because the corresponding setter
# kills an existing Content-Length header (see urllib.Request class
# (python38))
req.add_header('Content-Length', str(content_length))
try:
return urlopen(req)
except URLError as e:
e._osc_host_port = req.host
raise
finally:
if hasattr(conf.cookiejar, 'save'):
conf.cookiejar.save(ignore_discard=True)
def http_GET(*args, **kwargs): return http_request('GET', *args, **kwargs)
def http_POST(*args, **kwargs): return http_request('POST', *args, **kwargs)
def http_PUT(*args, **kwargs): return http_request('PUT', *args, **kwargs)
def http_DELETE(*args, **kwargs): return http_request('DELETE', *args, **kwargs)
def check_store_version(dir):
global store

View File

@ -1,414 +1,161 @@
# Copyright (C) 2009 Novell Inc.
# This program is free software; it may be used, copied, modified
# and distributed under the terms of the GNU General Public Licence,
# either version 2, or (at your option) any later version.
from __future__ import print_function
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto import m2, SSL, httpslib
import M2Crypto.m2urllib2
import binascii
import datetime
import errno
import os
import socket
import ssl
import sys
import inspect
import tempfile
import typing
try:
from urllib.parse import urlparse, splithost, splitport, splittype, urldefrag
from urllib.request import addinfourl
from http.client import HTTPSConnection
except ImportError:
#python 2.x
from urlparse import urlparse, urldefrag
from urllib import addinfourl, splithost, splitport, splittype
from httplib import HTTPSConnection
import urllib3.connection
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from urllib3.util.ssl_ import create_urllib3_context
from . import oscerr
# based on openssl's include/openssl/x509_vfy.h.in
X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT = 18
X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN = 19
def create_ssl_context():
"""
Create a ssl context with disabled weak crypto.
Relatively safe defaults are set in urllib3 already,
but we restrict crypto even more.
"""
ssl_context = create_urllib3_context()
ssl_context.options |= ssl.OP_NO_SSLv2
ssl_context.options |= ssl.OP_NO_SSLv3
ssl_context.options |= ssl.OP_NO_TLSv1
ssl_context.options |= ssl.OP_NO_TLSv1_1
return ssl_context
class CertVerificationError(oscerr.OscBaseError):
def __str__(self):
args_str = [str(i) for i in self.args]
return "Certificate Verification Error: " + "\n".join(args_str)
from .core import raw_input
class TrustedCertStore:
_tmptrusted = {}
def __init__(self, host, port, app, cert):
self.cert = cert
def __init__(self, ssl_context, host, port):
self.ssl_context = ssl_context
self.host = host
if self.host == None:
raise Exception("empty host")
if port:
self.host += "_%d" % port
import os
self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
self.file = self.dir + '/%s.pem' % self.host
self.port = port or 443
def is_known(self):
if self.host in self._tmptrusted:
return True
if not self.host:
raise ValueError("Empty `host`")
import os
if os.path.exists(self.file):
return True
return False
file_name = f"{self.host}_{self.port}"
self.dir_path = os.path.expanduser("~/.config/osc/trusted-certs")
self.pem_path = os.path.join(self.dir_path, f"{file_name}.pem")
def is_trusted(self):
import os
if self.host in self._tmptrusted:
cert = self._tmptrusted[self.host]
if os.path.isfile(self.pem_path):
# load permanently trusted certificate that is stored on disk
with open(self.pem_path, "rb") as f:
self.cert = x509.load_pem_x509_certificate(f.read())
self.ssl_context.load_verify_locations(cafile=self.pem_path)
else:
if not os.path.exists(self.file):
return False
from M2Crypto import X509
cert = X509.load_cert(self.file)
if self.cert.as_pem() == cert.as_pem():
return True
else:
return False
self.cert = None
def trust_tmp(self):
self._tmptrusted[self.host] = self.cert
def get_server_certificate(self):
# The following code throws an exception on self-signed certs,
# therefore we need to retrieve the cert differently.
# pem = ssl.get_server_certificate((self.host, self.port))
def trust_always(self):
self.trust_tmp()
from M2Crypto import X509
import os
if not os.path.exists(self.dir):
os.makedirs(self.dir)
self.cert.save_pem(self.file)
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
sock = ssl_context.wrap_socket(socket.socket(), server_hostname=self.host)
sock.connect((self.host, self.port))
der = sock.getpeercert(binary_form=True)
pem = ssl.DER_cert_to_PEM_cert(der)
cert = x509.load_pem_x509_certificate(pem.encode("utf-8"))
return cert
# verify_cb is called for each error once
# we only collect the errors and return suceess
# connection will be aborted later if it needs to
def verify_cb(ctx, ok, store):
if not ctx.verrs:
ctx.verrs = ValidationErrors()
try:
if not ok:
ctx.verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
return 1
except Exception as e:
print(e, file=sys.stderr)
return 0
class FailCert:
def __init__(self, cert):
def trust_permanently(self, cert):
"""
Permanently trust the certificate.
Store it as a pem file in ~/.config/osc/trusted-certs.
"""
self.cert = cert
self.errs = []
data = self.cert.public_bytes(serialization.Encoding.PEM)
with open(self.pem_path, "wb") as f:
f.write(data)
self.ssl_context.load_verify_locations(cafile=self.pem_path)
class ValidationErrors:
def trust_temporarily(self, cert):
"""
Temporarily trust the certificate.
"""
self.cert = cert
tmp_dir = os.path.expanduser("~/.config/osc")
data = self.cert.public_bytes(serialization.Encoding.PEM)
with tempfile.NamedTemporaryFile(mode="wb+", dir=tmp_dir, prefix="temp_trusted_cert_") as f:
f.write(data)
f.flush()
self.ssl_context.load_verify_locations(cafile=f.name)
def __init__(self):
self.chain_ok = True
self.cert_ok = True
self.failures = {}
def record(self, cert, err, depth):
#print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
if depth == 0:
self.cert_ok = False
else:
self.chain_ok = False
if not depth in self.failures:
self.failures[depth] = FailCert(cert)
else:
if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
raise Exception("Certificate changed unexpectedly. This should not happen")
self.failures[depth].errs.append(err)
def show(self, out):
for depth in self.failures.keys():
cert = self.failures[depth].cert
print("*** certificate verify failed at depth %d" % depth, file=out)
print("Subject: ", cert.get_subject(), file=out)
print("Issuer: ", cert.get_issuer(), file=out)
print("Valid: ", cert.get_not_before(), "-", cert.get_not_after(), file=out)
print("Fingerprint(MD5): ", cert.get_fingerprint('md5'), file=out)
print("Fingerprint(SHA1): ", cert.get_fingerprint('sha1'), file=out)
for err in self.failures[depth].errs:
reason = "Unknown"
try:
import M2Crypto.Err
reason = M2Crypto.Err.get_x509_verify_error(err)
except:
pass
print("Reason:", reason, file=out)
# check if the encountered errors could be ignored
def could_ignore(self):
if not 0 in self.failures:
return True
nonfatal_errors = [
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
m2.X509_V_ERR_CERT_UNTRUSTED,
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
m2.X509_V_ERR_CERT_NOT_YET_VALID,
m2.X509_V_ERR_CERT_HAS_EXPIRED,
m2.X509_V_OK,
]
canignore = True
for err in self.failures[0].errs:
if not err in nonfatal_errors:
canignore = False
break
return canignore
class mySSLContext(SSL.Context):
def __init__(self):
SSL.Context.__init__(self, 'sslv23')
self.set_options(m2.SSL_OP_NO_SSLv2 | m2.SSL_OP_NO_SSLv3)
self.set_cipher_list("ECDHE-RSA-AES128-SHA256:AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH")
self.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
self.verrs = None
#self.set_info_callback() # debug
self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self, ok, store))
class myHTTPSHandler(M2Crypto.m2urllib2.HTTPSHandler):
handler_order = 499
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.m2urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
# copied from M2Crypto.m2urllib2.HTTPSHandler
# it's sole purpose is to use our myHTTPSHandler/myHTTPSProxyHandler class
# ideally the m2urllib2.HTTPSHandler.https_open() method would be split into
# "do_open()" and "https_open()" so that we just need to override
# the small "https_open()" method...)
def https_open(self, req):
# https://docs.python.org/3.3/library/urllib.request.html#urllib.request.Request.get_host
try: # up to python-3.2
host = req.get_host()
except AttributeError: # from python-3.3
host = req.host
if not host:
raise M2Crypto.m2urllib2.URLError('no host given')
# Our change: Check to see if we're using a proxy.
# Then create an appropriate ssl-aware connection.
full_url = req.get_full_url()
target_host = urlparse(full_url)[1]
if target_host != host:
request_uri = urldefrag(full_url)[0]
h = myProxyHTTPSConnection(host=host, appname=self.appname, ssl_context=self.ctx)
else:
try: # up to python-3.2
request_uri = req.get_selector()
except AttributeError: # from python-3.3
request_uri = req.selector
h = myHTTPSConnection(host=host, appname=self.appname, ssl_context=self.ctx)
# End our change
h.set_debuglevel(self._debuglevel)
headers = dict(req.headers)
headers.update(req.unredirected_hdrs)
# We want to make an HTTP/1.1 request, but the addinfourl
# class isn't prepared to deal with a persistent connection.
# It will try to read all remaining data from the socket,
# which will block while the server waits for the next request.
# So make sure the connection gets closed after the (only)
# request.
headers["Connection"] = "close"
@staticmethod
def _display_cert(cert):
print("Subject:", cert.subject.rfc4514_string())
print("Issuer:", cert.issuer.rfc4514_string())
try:
h.request(req.get_method(), request_uri, req.data, headers)
r = h.getresponse()
except socket.error as err: # XXX what error?
raise M2Crypto.m2urllib2.URLError(err)
san_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
san_ext_value = typing.cast(x509.SubjectAlternativeName, san_ext.value)
san_ext_dnsnames = san_ext_value.get_values_for_type(x509.DNSName)
except x509.extensions.ExtensionNotFound:
san_ext_dnsnames = ["(not available)"]
for san in san_ext_dnsnames:
print("subjectAltName:", san)
print("Valid:", cert.not_valid_before, "->", cert.not_valid_after)
print("Fingerprint(MD5):", binascii.hexlify(cert.fingerprint(hashes.MD5())).decode("utf-8"))
print("Fingerprint(SHA1):", binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode("utf-8"))
# Pick apart the HTTPResponse object to get the addinfourl
# object initialized properly.
# Wrap the HTTPResponse object in socket's file object adapter
# for Windows. That adapter calls recv(), so delegate recv()
# to read(). This weird wrapping allows the returned object to
# have readline() and readlines() methods.
r.recv = r.read
if (sys.version_info < (3, 0)):
fp = socket._fileobject(r, close=True)
def prompt_trust(self, cert, reason):
if self.cert:
# check if the certificate matches the already trusted certificate for the host and port
if cert != self.cert:
raise CertVerificationError([
"Remote host identification has changed",
"",
"WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!",
"IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!",
"",
f"Offending certificate is at '{self.pem_path}'"
])
else:
r._decref_socketios = lambda: None
r.ssl = h.sock.ssl
r._timeout = -1.0
# hack to bypass python3 bug with 0 buffer size and
# http/client.py readinto method for response class
if r.length is not None and r.length == 0:
r.readinto = lambda b: 0
r.recv_into = r.readinto
fp = socket.SocketIO(r, 'rb')
# since there is no trusted certificate on disk,
# let's display the server cert and give user options to trust it
print("The server certificate failed verification")
print()
self._display_cert(cert)
print(f"Reason: {reason}")
resp = addinfourl(fp, r.msg, req.get_full_url())
resp.code = r.status
resp.msg = r.reason
return resp
class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.HTTPSConnection.__init__(self, *args, **kwargs)
def _connect(self, family):
# workaround for old M2Crypto versions where the the
# SSL.Connection.__init__ constructor has no "family" parameter
kwargs = {}
argspec = inspect.getargspec(SSL.Connection.__init__)
if 'family' in argspec.args:
kwargs['family'] = family
elif family != socket.AF_INET:
# old SSL.Connection classes implicitly use socket.AF_INET
return False
self.sock = SSL.Connection(self.ssl_ctx, **kwargs)
if self.session:
self.sock.set_session(self.session)
if hasattr(self.sock, 'set_tlsext_host_name'):
self.sock.set_tlsext_host_name(self.host)
self.sock.connect((self.host, self.port))
return True
def connect(self):
# based on M2Crypto.httpslib.HTTPSConnection.connect
last_exc = None
connected = False
for addrinfo in socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM,
0, 0):
try:
connected = self._connect(addrinfo[0])
if connected:
break
except socket.error as e:
last_exc = e
finally:
if not connected and self.sock is not None:
self.sock.close()
if not connected:
if last_exc is None:
msg = 'getaddrinfo returned empty list or unsupported families'
raise RuntimeError(msg)
raise last_exc
# ok we are connected, verify cert
verify_certificate(self)
def getHost(self):
return self.host
def getPort(self):
return self.port
class myProxyHTTPSConnection(M2Crypto.httpslib.ProxyHTTPSConnection, HTTPSConnection):
def __init__(self, *args, **kwargs):
self.appname = kwargs.pop('appname', 'generic')
M2Crypto.httpslib.ProxyHTTPSConnection.__init__(self, *args, **kwargs)
def _start_ssl(self):
M2Crypto.httpslib.ProxyHTTPSConnection._start_ssl(self)
verify_certificate(self)
def endheaders(self, *args, **kwargs):
if self._proxy_auth is None:
self._proxy_auth = self._encode_auth()
HTTPSConnection.endheaders(self, *args, **kwargs)
# broken in m2crypto: port needs to be an int
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
#putrequest is called before connect, so can interpret url and get
#real host/port to be used to make CONNECT request to proxy
proto, rest = splittype(url)
if proto is None:
raise ValueError("unknown URL type: %s" % url)
#get host
host, rest = splithost(rest)
#try to get port
host, port = splitport(host)
#if port is not defined try to get from proto
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError("unknown protocol for: %s" % url)
self._real_host = host
self._real_port = int(port)
M2Crypto.httpslib.HTTPSConnection.putrequest(self, method, url, skip_host, skip_accept_encoding)
def getHost(self):
return self._real_host
def getPort(self):
return self._real_port
def verify_certificate(connection):
ctx = connection.sock.ctx
verrs = ctx.verrs
ctx.verrs = None
cert = connection.sock.get_peer_cert()
if not cert:
connection.close()
raise SSLVerificationError("server did not present a certificate")
# XXX: should be check if the certificate is known anyways?
# Maybe it changed to something valid.
if not connection.sock.verify_ok():
tc = TrustedCertStore(connection.getHost(), connection.getPort(), connection.appname, cert)
if tc.is_known():
if tc.is_trusted(): # ok, same cert as the stored one
return
else:
print("WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", file=sys.stderr)
print("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", file=sys.stderr)
print("offending certificate is at '%s'" % tc.file, file=sys.stderr)
raise SSLVerificationError("remote host identification has changed")
# if http_debug is set we redirect sys.stdout to an StringIO
# instance in order to do some header filtering (see conf module)
# so we have to use the "original" stdout for printing
out = getattr(connection, '_orig_stdout', sys.stdout)
verrs.show(out)
print(file=out)
if not verrs.could_ignore():
raise SSLVerificationError("Certificate validation error cannot be ignored")
if not verrs.chain_ok:
print("A certificate in the chain failed verification", file=out)
if not verrs.cert_ok:
print("The server certificate failed verification", file=out)
while True:
print("""
while True:
print("""
Would you like to
0 - quit (default)
1 - continue anyways
2 - trust the server certificate permanently
9 - review the server certificate
""", file=out)
""")
print("Enter choice [0129]: ", end='', file=out)
r = raw_input()
if not r or r == '0':
connection.close()
raise SSLVerificationError("Untrusted Certificate")
elif r == '1':
tc.trust_tmp()
return
elif r == '2':
tc.trust_always()
return
elif r == '9':
print(cert.as_text(), file=out)
# vim: sw=4 et
print("Enter choice [0129]: ", end="")
r = input()
if not r or r == "0":
raise CertVerificationError(["Untrusted certificate"])
elif r == "1":
self.trust_temporarily(cert)
return
elif r == "2":
self.trust_permanently(cert)
return
elif r == "9":
print(cert.to_txt())

View File

@ -1,8 +0,0 @@
class NoSecureSSLError(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
# vim: sw=4 et

View File

@ -103,7 +103,7 @@ setuptools.setup(
packages=['osc', 'osc.util'],
scripts=['osc-wrapper.py'],
data_files=data_files,
install_requires=['M2Crypto', 'chardet'],
install_requires=['cryptography', 'urllib3'],
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",

View File

@ -1,29 +1,18 @@
import unittest
import osc.core
import shutil
import tempfile
import io
import os
import shutil
import sys
try:
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
from xml.etree import cElementTree as ET
except ImportError:
# will import a fast implementation from 3.3 onwards, needed
# for 3.9+
from xml.etree import ElementTree as ET
EXPECTED_REQUESTS = []
import tempfile
import unittest
from unittest.mock import patch
from urllib.request import HTTPHandler, addinfourl, build_opener
from urllib.parse import urlparse, parse_qs
from xml.etree import ElementTree as ET
try:
#python 2.x
from cStringIO import StringIO
from urllib2 import HTTPHandler, addinfourl, build_opener
from urlparse import urlparse, parse_qs
except ImportError:
from io import StringIO
from urllib.request import HTTPHandler, addinfourl, build_opener
from urllib.parse import urlparse, parse_qs
import urllib3.response
import osc.core
from io import BytesIO
def urlcompare(url, *args):
"""compare all components of url except query string - it is converted to
@ -94,94 +83,122 @@ class RequestDataMismatch(Exception):
def __str__(self):
return '%s, %s, %s' % (self.url, self.got, self.exp)
class MyHTTPHandler(HTTPHandler):
def __init__(self, exp_requests, fixtures_dir):
HTTPHandler.__init__(self)
self.__exp_requests = exp_requests
self.__fixtures_dir = fixtures_dir
def http_open(self, req):
r = self.__exp_requests.pop(0)
if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]:
raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0])
if req.get_method() in ('GET', 'DELETE'):
return self.__mock_GET(r[1], **r[2])
elif req.get_method() in ('PUT', 'POST'):
return self.__mock_PUT(req, **r[2])
EXPECTED_REQUESTS = []
def __mock_GET(self, fullurl, **kwargs):
return self.__get_response(fullurl, **kwargs)
def __mock_PUT(self, req, **kwargs):
exp = kwargs.get('exp', None)
if exp is not None and 'expfile' in kwargs:
raise RuntimeError('either specify exp or expfile')
elif 'expfile' in kwargs:
exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'rb').read()
elif exp is None:
raise RuntimeError('exp or expfile required')
# HACK: Fix "ValueError: I/O operation on closed file." error in tests on openSUSE Leap 15.2.
# The problem seems to appear only in the tests, possibly some interaction with MockHTTPConnectionPool.
# Porting 753fbc03 to urllib3 in openSUSE Leap 15.2 would fix the problem.
urllib3.response.HTTPResponse.__iter__ = lambda self : iter(self._fp)
class MockHTTPConnectionPool:
def __init__(self, host, port=None, **conn_kw):
pass
def urlopen(self, method, url, body=None, headers=None, retries=None, **response_kw):
global EXPECTED_REQUESTS
request = EXPECTED_REQUESTS.pop(0)
url = f"http://localhost{url}"
if not urlcompare(request["url"], url) or request["method"] != method:
raise RequestWrongOrder(request["url"], url, request["method"], method)
if method in ("POST", "PUT"):
if 'exp' not in request and 'expfile' in request:
with open(request['expfile'], 'rb') as f:
exp = f.read()
elif 'exp' in request and 'expfile' not in request:
exp = request['exp'].encode('utf-8')
else:
raise RuntimeError('Specify either `exp` or `expfile`')
body = body or b""
if hasattr(body, "read"):
# if it is a file-like object, read it
body = body.read()
if hasattr(body, "encode"):
# if it can be encoded to bytes, do it
body = body.encode("utf-8")
if body != exp:
# We do not have a notion to explicitly mark xml content. In case
# of xml, we do not care about the exact xml representation (for
# now). Hence, if both, data and exp, are xml and are "equal",
# everything is fine (for now); otherwise, error out
# (of course, this is problematic if we want to ensure that XML
# documents are bit identical...)
if not xml_equal(body, exp):
raise RequestDataMismatch(url, repr(body), repr(exp))
if 'exception' in request:
raise request["exception"]
if 'text' not in request and 'file' in request:
with open(request['file'], 'rb') as f:
data = f.read()
elif 'text' in request and 'file' not in request:
data = request['text'].encode('utf-8')
else:
# for now, assume exp is a str
exp = exp.encode('utf-8')
# use req.data instead of req.get_data() for python3 compatiblity
data = req.data
if hasattr(data, 'read'):
data = data.read()
if data != exp:
# We do not have a notion to explicitly mark xml content. In case
# of xml, we do not care about the exact xml representation (for
# now). Hence, if both, data and exp, are xml and are "equal",
# everything is fine (for now); otherwise, error out
# (of course, this is problematic if we want to ensure that XML
# documents are bit identical...)
if not xml_equal(data, exp):
raise RequestDataMismatch(req.get_full_url(), repr(data), repr(exp))
return self.__get_response(req.get_full_url(), **kwargs)
raise RuntimeError('Specify either `text` or `file`')
def __get_response(self, url, **kwargs):
f = None
if 'exception' in kwargs:
raise kwargs['exception']
if 'text' not in kwargs and 'file' in kwargs:
f = BytesIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'rb').read())
elif 'text' in kwargs and 'file' not in kwargs:
f = BytesIO(kwargs['text'].encode('utf-8'))
else:
raise RuntimeError('either specify text or file')
resp = addinfourl(f, {}, url)
resp.code = kwargs.get('code', 200)
resp.msg = ''
return resp
response = urllib3.response.HTTPResponse(body=data, status=request.get("code", 200))
response._fp = io.BytesIO(data)
def urldecorator(method, fullurl, **kwargs):
return response
def urldecorator(method, url, **kwargs):
def decorate(test_method):
def wrapped_test_method(*args):
addExpectedRequest(method, fullurl, **kwargs)
test_method(*args)
# "rename" method otherwise we cannot specify a TestCaseClass.testName
# cmdline arg when using unittest.main()
def wrapped_test_method(self):
# put all args into a single dictionary
kwargs["method"] = method
kwargs["url"] = url
# prepend fixtures dir to `file`
if "file" in kwargs:
kwargs["file"] = os.path.join(self._get_fixtures_dir(), kwargs["file"])
# prepend fixtures dir to `expfile`
if "expfile" in kwargs:
kwargs["expfile"] = os.path.join(self._get_fixtures_dir(), kwargs["expfile"])
EXPECTED_REQUESTS.append(kwargs)
test_method(self)
# mock connection pool, but only just once
if not hasattr(test_method, "_MockHTTPConnectionPool"):
wrapped_test_method = patch('urllib3.HTTPConnectionPool', MockHTTPConnectionPool)(wrapped_test_method)
wrapped_test_method._MockHTTPConnectionPool = True
wrapped_test_method.__name__ = test_method.__name__
return wrapped_test_method
return decorate
def GET(fullurl, **kwargs):
return urldecorator('GET', fullurl, **kwargs)
def PUT(fullurl, **kwargs):
return urldecorator('PUT', fullurl, **kwargs)
def GET(path, **kwargs):
return urldecorator('GET', path, **kwargs)
def POST(fullurl, **kwargs):
return urldecorator('POST', fullurl, **kwargs)
def DELETE(fullurl, **kwargs):
return urldecorator('DELETE', fullurl, **kwargs)
def PUT(path, **kwargs):
return urldecorator('PUT', path, **kwargs)
def POST(path, **kwargs):
return urldecorator('POST', path, **kwargs)
def DELETE(path, **kwargs):
return urldecorator('DELETE', path, **kwargs)
def addExpectedRequest(method, url, **kwargs):
global EXPECTED_REQUESTS
EXPECTED_REQUESTS.append((method, url, kwargs))
class OscTestCase(unittest.TestCase):
def setUp(self, copytree=True):
global EXPECTED_REQUESTS
EXPECTED_REQUESTS = []
os.chdir(os.path.dirname(__file__))
oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc')
osc.core.conf.get_config(override_conffile=oscrc,
@ -191,19 +208,16 @@ class OscTestCase(unittest.TestCase):
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
if copytree:
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest'))
global EXPECTED_REQUESTS
EXPECTED_REQUESTS = []
osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir()))
self.stdout = sys.stdout
sys.stdout = StringIO()
sys.stdout = io.StringIO()
def tearDown(self):
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
sys.stdout = self.stdout
try:
shutil.rmtree(self.tmpdir)
except:
pass
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
def _get_fixtures_dir(self):
raise NotImplementedError('subclasses should implement this method')

View File

@ -4,48 +4,41 @@ import osc.oscerr
import os
import re
import sys
from .common import GET, POST, OscTestCase, addExpectedRequest, EXPECTED_REQUESTS
from .common import GET, POST, OscTestCase, EXPECTED_REQUESTS
FIXTURES_DIR = os.path.join(os.path.dirname(__file__), 'prdiff_fixtures')
API_URL = 'http://localhost/'
UPSTREAM = 'some:project'
BRANCH = 'home:user:branches:' + UPSTREAM
def rdiff_url(pkg, oldprj, newprj):
return API_URL + 'source/%s/%s?unified=1&opackage=%s&oproject=%s&cmd=diff&expand=1&tarlimit=0&filelimit=0' % \
return 'http://localhost/source/%s/%s?unified=1&opackage=%s&oproject=%s&cmd=diff&expand=1&tarlimit=0&filelimit=0' % \
(newprj, pkg, pkg, oldprj.replace(':', '%3A'))
def request_url(prj):
return API_URL + 'search/request?match=%%28state%%2F%%40name%%3D%%27new%%27+or+state%%2F%%40name%%3D%%27review%%27%%29+and+%%28action%%2Ftarget%%2F%%40project%%3D%%27%s%%27+or+action%%2Fsource%%2F%%40project%%3D%%27%s%%27%%29' % \
return 'http://localhost/search/request?match=%%28state%%2F%%40name%%3D%%27new%%27+or+state%%2F%%40name%%3D%%27review%%27%%29+and+%%28action%%2Ftarget%%2F%%40project%%3D%%27%s%%27+or+action%%2Fsource%%2F%%40project%%3D%%27%s%%27%%29' % \
tuple([prj.replace(':', '%3A')] * 2)
def GET_PROJECT_PACKAGES(*projects):
def decorator(test_method):
def wrapped_test_method(*args):
for project in projects:
addExpectedRequest('GET', API_URL + 'source/' + project,
file='%s/directory' % project)
test_method(*args)
# "rename" method otherwise we cannot specify a TestCaseClass.testName
# cmdline arg when using unittest.main()
wrapped_test_method.__name__ = test_method.__name__
return wrapped_test_method
# decorators get applied in the reversed order (bottom-up)
for project in reversed(projects):
test_method = GET(f'http://localhost/source/{project}', file=f'{project}/directory')(test_method)
return test_method
return decorator
def POST_RDIFF(oldprj, newprj):
def decorator(test_method):
def wrapped_test_method(*args):
addExpectedRequest('POST', rdiff_url('common-one', oldprj, newprj), exp='', text='')
addExpectedRequest('POST', rdiff_url('common-two', oldprj, newprj), exp='', file='common-two-diff')
addExpectedRequest('POST', rdiff_url('common-three', oldprj, newprj), exp='', text='')
test_method(*args)
# "rename" method otherwise we cannot specify a TestCaseClass.testName
# cmdline arg when using unittest.main()
wrapped_test_method.__name__ = test_method.__name__
return wrapped_test_method
# decorators get applied in the reversed order (bottom-up)
test_method = POST(rdiff_url('common-three', oldprj, newprj), exp='', text='')(test_method)
test_method = POST(rdiff_url('common-two', oldprj, newprj), exp='', file='common-two-diff')(test_method)
test_method = POST(rdiff_url('common-one', oldprj, newprj), exp='', text='')(test_method)
return test_method
return decorator
def suite():
import unittest
return unittest.makeSuite(TestProjectDiff)