mirror of
https://github.com/openSUSE/osc.git
synced 2024-11-10 06:46:15 +01:00
Merge pull request #1000 from dmach/urllib3
Switch http_request() to urllib3
This commit is contained in:
commit
85dbd9f626
9
.github/workflows/unittests.yaml
vendored
9
.github/workflows/unittests.yaml
vendored
@ -30,8 +30,7 @@ jobs:
|
||||
- 'opensuse/tumbleweed'
|
||||
|
||||
# CentOS Stream
|
||||
# stream9 doesn't contain m2crypto required by osc
|
||||
# - 'quay.io/centos/centos:stream9'
|
||||
- 'quay.io/centos/centos:stream9'
|
||||
|
||||
# Debian
|
||||
- 'debian:stable'
|
||||
@ -50,7 +49,7 @@ jobs:
|
||||
zypper --non-interactive --gpg-auto-import-keys refresh
|
||||
zypper --non-interactive dist-upgrade
|
||||
zypper --non-interactive install git-lfs
|
||||
zypper --non-interactive install diffstat diffutils python3 python3-chardet python3-M2Crypto python3-pip python3-rpm python3-setuptools
|
||||
zypper --non-interactive install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
|
||||
- name: 'Install packages (Fedora/CentOS)'
|
||||
if: ${{ startsWith(matrix.container, 'fedora:') || contains(matrix.container, 'centos:') }}
|
||||
@ -58,7 +57,7 @@ jobs:
|
||||
dnf -y makecache
|
||||
dnf -y distro-sync
|
||||
dnf -y install git-lfs
|
||||
dnf -y install diffstat diffutils python3 python3-chardet python3-m2crypto python3-pip python3-rpm python3-setuptools
|
||||
dnf -y install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
|
||||
- name: 'Install packages (Debian/Ubuntu)'
|
||||
if: ${{ startsWith(matrix.container, 'debian:') || startsWith(matrix.container, 'ubuntu:') }}
|
||||
@ -66,7 +65,7 @@ jobs:
|
||||
apt-get -y update
|
||||
apt-get -y upgrade
|
||||
apt-get -y --no-install-recommends install git-lfs
|
||||
apt-get -y --no-install-recommends install diffstat diffutils python3 python3-chardet python3-m2crypto python3-pip python3-rpm python3-setuptools
|
||||
apt-get -y --no-install-recommends install diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
|
@ -8,26 +8,18 @@ from __future__ import print_function
|
||||
import errno
|
||||
import os.path
|
||||
import pdb
|
||||
import ssl
|
||||
import sys
|
||||
import signal
|
||||
import traceback
|
||||
|
||||
from osc import oscerr
|
||||
from .oscsslexcp import NoSecureSSLError
|
||||
from .oscssl import CertVerificationError
|
||||
from osc.util.cpio import CpioError
|
||||
from osc.util.packagequery import PackageError
|
||||
from osc.util.helper import decode_it
|
||||
from osc.OscConfigParser import configparser
|
||||
|
||||
try:
|
||||
from M2Crypto.SSL.Checker import SSLVerificationError
|
||||
from M2Crypto.SSL import SSLError as SSLError
|
||||
except:
|
||||
class SSLError(Exception):
|
||||
pass
|
||||
class SSLVerificationError(Exception):
|
||||
pass
|
||||
|
||||
try:
|
||||
# import as RPMError because the class "error" is too generic
|
||||
from rpm import error as RPMError
|
||||
@ -36,13 +28,9 @@ except:
|
||||
class RPMError(Exception):
|
||||
pass
|
||||
|
||||
try:
|
||||
from http.client import HTTPException, BadStatusLine
|
||||
from urllib.error import URLError, HTTPError
|
||||
except ImportError:
|
||||
#python 2.x
|
||||
from httplib import HTTPException, BadStatusLine
|
||||
from urllib2 import URLError, HTTPError
|
||||
import urllib3.exceptions
|
||||
from http.client import HTTPException, BadStatusLine
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
# the good things are stolen from Matt Mackall's mercurial
|
||||
|
||||
@ -142,6 +130,10 @@ def run(prg, argv=None):
|
||||
msg += ' (%s)' % e._osc_host_port
|
||||
msg += ':\n'
|
||||
print(msg, e.reason, file=sys.stderr)
|
||||
except ssl.SSLError as e:
|
||||
if 'tlsv1' in str(e):
|
||||
print('The python on this system or the server does not support TLSv1.2', file=sys.stderr)
|
||||
print("SSL Error:", e, file=sys.stderr)
|
||||
except IOError as e:
|
||||
# ignore broken pipe
|
||||
if e.errno != errno.EPIPE:
|
||||
@ -182,14 +174,10 @@ def run(prg, argv=None):
|
||||
print('%s:' % e.fname, e.msg, file=sys.stderr)
|
||||
except RPMError as e:
|
||||
print(e, file=sys.stderr)
|
||||
except SSLError as e:
|
||||
if 'tlsv1' in str(e):
|
||||
print('The python on this system does not support TLSv1.2', file=sys.stderr)
|
||||
print("SSL Error:", e, file=sys.stderr)
|
||||
except SSLVerificationError as e:
|
||||
print("Certificate Verification Error:", e, file=sys.stderr)
|
||||
except NoSecureSSLError as e:
|
||||
except CertVerificationError as e:
|
||||
print(e, file=sys.stderr)
|
||||
except urllib3.exceptions.MaxRetryError as e:
|
||||
print(e.reason, file=sys.stderr)
|
||||
except CpioError as e:
|
||||
print(e, file=sys.stderr)
|
||||
except oscerr.OscBaseError as e:
|
||||
|
@ -35,7 +35,8 @@ except ImportError:
|
||||
# for 3.9+
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
from .conf import config, cookiejar
|
||||
from . import connection
|
||||
from .conf import config
|
||||
|
||||
from .meter import create_text_meter
|
||||
|
||||
@ -1092,7 +1093,7 @@ def main(apiurl, opts, argv):
|
||||
http_debug = config['http_debug'],
|
||||
modules = bi.modules,
|
||||
enable_cpio=not opts.disable_cpio_bulk_download and bi.enable_cpio,
|
||||
cookiejar=cookiejar,
|
||||
cookiejar=CookieJarAuthHandler(os.path.expanduser(conf.config["cookiejar"]))._cookiejar,
|
||||
download_api_only=opts.download_api_only)
|
||||
|
||||
if not opts.trust_all_projects:
|
||||
|
403
osc/conf.py
403
osc/conf.py
@ -37,39 +37,18 @@ The configuration dictionary could look like this:
|
||||
"""
|
||||
|
||||
import bz2
|
||||
import base64
|
||||
import errno
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import ssl
|
||||
import getpass
|
||||
import time
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
from http.cookiejar import LWPCookieJar, CookieJar
|
||||
from http.client import HTTPConnection, HTTPResponse
|
||||
from io import StringIO
|
||||
from urllib.parse import urlsplit
|
||||
from urllib.error import URLError
|
||||
from urllib.request import HTTPBasicAuthHandler, HTTPCookieProcessor, HTTPPasswordMgrWithDefaultRealm, ProxyHandler
|
||||
from urllib.request import AbstractHTTPHandler, build_opener, proxy_bypass, HTTPSHandler
|
||||
from urllib.request import BaseHandler, parse_keqv_list, parse_http_list
|
||||
except ImportError:
|
||||
#python 2.x
|
||||
from cookielib import LWPCookieJar, CookieJar
|
||||
from httplib import HTTPConnection, HTTPResponse
|
||||
from StringIO import StringIO
|
||||
from urlparse import urlsplit
|
||||
from urllib2 import URLError, HTTPBasicAuthHandler, HTTPCookieProcessor, HTTPPasswordMgrWithDefaultRealm, ProxyHandler
|
||||
from urllib2 import AbstractHTTPHandler, build_opener, proxy_bypass, HTTPSHandler
|
||||
from urllib2 import BaseHandler, parse_keqv_list, parse_http_list
|
||||
from io import StringIO
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from . import OscConfigParser
|
||||
from osc import oscerr
|
||||
from osc.util.helper import raw_input, decode_it
|
||||
from .oscsslexcp import NoSecureSSLError
|
||||
from osc.util.helper import raw_input
|
||||
from osc import credentials
|
||||
|
||||
GENERIC_KEYRING = False
|
||||
@ -424,12 +403,10 @@ Make sure that it has a [general] section.
|
||||
"""
|
||||
|
||||
config_missing_apiurl_text = """
|
||||
the apiurl \'%s\' does not exist in the config file. Please enter
|
||||
The apiurl \'%s\' does not exist in the config file. Please enter
|
||||
your credentials for this apiurl.
|
||||
"""
|
||||
|
||||
cookiejar = None
|
||||
|
||||
|
||||
def parse_apisrv_url(scheme, apisrv):
|
||||
if apisrv.startswith('http://') or apisrv.startswith('https://'):
|
||||
@ -437,8 +414,7 @@ def parse_apisrv_url(scheme, apisrv):
|
||||
elif scheme != None:
|
||||
url = scheme + apisrv
|
||||
else:
|
||||
msg = 'invalid apiurl \'%s\' (specify the protocol (http:// or https://))' % apisrv
|
||||
raise URLError(msg)
|
||||
url = "https://" + apisrv
|
||||
scheme, url, path = urlsplit(url)[0:3]
|
||||
return scheme, url, path.rstrip('/')
|
||||
|
||||
@ -502,334 +478,6 @@ def get_apiurl_usr(apiurl):
|
||||
return config['user']
|
||||
|
||||
|
||||
# workaround m2crypto issue:
|
||||
# if multiple SSL.Context objects are created
|
||||
# m2crypto only uses the last object which was created.
|
||||
# So we need to build a new opener everytime we switch the
|
||||
# apiurl (because different apiurls may have different
|
||||
# cafile/capath locations)
|
||||
def _build_opener(apiurl):
|
||||
from osc.core import __version__
|
||||
global config
|
||||
|
||||
class OscHTTPAuthHandler(HTTPBasicAuthHandler, object):
|
||||
# python2: inherit from object in order to make it a new-style class
|
||||
# (HTTPBasicAuthHandler is not a new-style class)
|
||||
|
||||
def __init__(self, password_mgr=None, signatureauthhandler=None):
|
||||
super(self.__class__, self).__init__(password_mgr)
|
||||
self.signatureauthhandler = signatureauthhandler
|
||||
|
||||
def add_parent(self, parent):
|
||||
super(self.__class__, self).add_parent(parent)
|
||||
if self.signatureauthhandler:
|
||||
self.signatureauthhandler.add_parent(parent)
|
||||
|
||||
def _rewind_request(self, req):
|
||||
if hasattr(req.data, 'seek'):
|
||||
# if the request is issued again (this time with an
|
||||
# Authorization header), the file's offset has to be
|
||||
# repositioned to the beginning of the file (otherwise,
|
||||
# a 0-length body is sent which most likely does not match
|
||||
# the Content-Length header (if present))
|
||||
req.data.seek(0)
|
||||
|
||||
def http_error_401(self, req, fp, code, msg, headers):
|
||||
self._rewind_request(req)
|
||||
authreqs = {}
|
||||
for authreq in headers.get_all('www-authenticate', []):
|
||||
scheme = authreq.split()[0].lower()
|
||||
authreqs[scheme] = authreq
|
||||
|
||||
if 'signature' in authreqs \
|
||||
and self.signatureauthhandler \
|
||||
and (
|
||||
# sshkey explicitly set in the config file, use it instead of doing basic auth
|
||||
self.signatureauthhandler.sshkey_known()
|
||||
or (
|
||||
# can't fall-back to basic auth, because server doesn't support it
|
||||
'basic' not in authreqs
|
||||
# can't fall-back to basic auth, because there's no password provided
|
||||
or not self.passwd.find_user_password(None, apiurl)[1]
|
||||
)):
|
||||
del headers['www-authenticate']
|
||||
headers['www-authenticate'] = authreqs['signature']
|
||||
return self.signatureauthhandler.http_error_401(req, fp, code, msg, headers)
|
||||
|
||||
if 'basic' in authreqs:
|
||||
del headers['www-authenticate']
|
||||
headers['www-authenticate'] = authreqs['basic']
|
||||
|
||||
response = super(self.__class__, self).http_error_401(req, fp, code, msg, headers)
|
||||
# workaround for http://bugs.python.org/issue9639
|
||||
if hasattr(self, 'retried'):
|
||||
self.retried = 0
|
||||
return response
|
||||
|
||||
class OscHTTPSignatureAuthHandler(BaseHandler, object):
|
||||
def __init__(self, user, sshkey):
|
||||
super(self.__class__, self).__init__()
|
||||
self.user = user
|
||||
self.sshkey = sshkey
|
||||
|
||||
def list_ssh_agent_keys(self):
|
||||
cmd = ['ssh-add', '-l']
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
if proc.returncode == 0 and stdout.strip():
|
||||
return [self.get_fingerprint(line) for line in stdout.splitlines()]
|
||||
else:
|
||||
return []
|
||||
|
||||
def is_ssh_private_keyfile(self, keyfile_path):
|
||||
if not os.path.isfile(keyfile_path):
|
||||
return False
|
||||
with open(keyfile_path, "r") as f:
|
||||
try:
|
||||
line = f.readline(100).strip()
|
||||
except UnicodeDecodeError:
|
||||
# skip binary files
|
||||
return False
|
||||
if line == "-----BEGIN RSA PRIVATE KEY-----":
|
||||
return True
|
||||
if line == "-----BEGIN OPENSSH PRIVATE KEY-----":
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_ssh_public_keyfile(self, keyfile_path):
|
||||
if not os.path.isfile(keyfile_path):
|
||||
return False
|
||||
return keyfile_path.endswith(".pub")
|
||||
|
||||
@staticmethod
|
||||
def get_fingerprint(line):
|
||||
parts = line.strip().split(b" ")
|
||||
if len(parts) < 2:
|
||||
raise ValueError("Unable to retrieve ssh key fingerprint from line: {}".format(line))
|
||||
return parts[1]
|
||||
|
||||
def list_ssh_dir_keys(self):
|
||||
sshdir = os.path.expanduser('~/.ssh')
|
||||
keys_in_home_ssh = {}
|
||||
for keyfile in os.listdir(sshdir):
|
||||
if keyfile.startswith(("agent-", "authorized_keys", "config", "known_hosts")):
|
||||
# skip files that definitely don't contain keys
|
||||
continue
|
||||
|
||||
keyfile_path = os.path.join(sshdir, keyfile)
|
||||
# public key alone may be sufficient because the private key
|
||||
# can get loaded into ssh-agent from gpg (yubikey works this way)
|
||||
is_public = self.is_ssh_public_keyfile(keyfile_path)
|
||||
# skip private detection if we think the key is a public one already
|
||||
is_private = False if is_public else self.is_ssh_private_keyfile(keyfile_path)
|
||||
|
||||
if not is_public and not is_private:
|
||||
continue
|
||||
|
||||
cmd = ["ssh-keygen", "-lf", keyfile_path]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
fingerprint = self.get_fingerprint(stdout)
|
||||
if fingerprint and (fingerprint not in keys_in_home_ssh or is_private):
|
||||
# prefer path to a private key
|
||||
keys_in_home_ssh[fingerprint] = keyfile_path
|
||||
return keys_in_home_ssh
|
||||
|
||||
def guess_keyfile(self):
|
||||
keys_in_agent = self.list_ssh_agent_keys()
|
||||
if keys_in_agent:
|
||||
keys_in_home_ssh = self.list_ssh_dir_keys()
|
||||
for fingerprint in keys_in_agent:
|
||||
if fingerprint in keys_in_home_ssh:
|
||||
return keys_in_home_ssh[fingerprint]
|
||||
sshdir = os.path.expanduser('~/.ssh')
|
||||
keyfiles = ('id_ed25519', 'id_ed25519_sk', 'id_rsa', 'id_ecdsa', 'id_ecdsa_sk', 'id_dsa')
|
||||
for keyfile in keyfiles:
|
||||
keyfile_path = os.path.join(sshdir, keyfile)
|
||||
if os.path.isfile(keyfile_path):
|
||||
return keyfile_path
|
||||
raise oscerr.OscIOError(None, 'could not guess ssh identity keyfile')
|
||||
|
||||
def ssh_sign(self, data, namespace, keyfile=None):
|
||||
try:
|
||||
data = bytes(data, 'utf-8')
|
||||
except:
|
||||
pass
|
||||
if not keyfile:
|
||||
keyfile = self.guess_keyfile()
|
||||
else:
|
||||
if '/' not in keyfile:
|
||||
keyfile = '~/.ssh/' + keyfile
|
||||
keyfile = os.path.expanduser(keyfile)
|
||||
|
||||
cmd = ['ssh-keygen', '-Y', 'sign', '-f', keyfile, '-n', namespace, '-q']
|
||||
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate(data)
|
||||
if proc.returncode:
|
||||
raise oscerr.OscIOError(None, 'ssh-keygen signature creation failed: %d' % proc.returncode)
|
||||
|
||||
signature = decode_it(stdout)
|
||||
match = re.match(r"\A-----BEGIN SSH SIGNATURE-----\n(.*)\n-----END SSH SIGNATURE-----", signature, re.S)
|
||||
if not match:
|
||||
raise oscerr.OscIOError(None, 'could not extract ssh signature')
|
||||
return base64.b64decode(match.group(1))
|
||||
|
||||
def get_authorization(self, req, chal):
|
||||
realm = chal.get('realm', '')
|
||||
now = int(time.time())
|
||||
sigdata = "(created): %d" % now
|
||||
signature = self.ssh_sign(sigdata, realm, self.sshkey)
|
||||
signature = decode_it(base64.b64encode(signature))
|
||||
return 'keyId="%s",algorithm="ssh",headers="(created)",created=%d,signature="%s"' \
|
||||
% (self.user, now, signature)
|
||||
|
||||
def retry_http_signature_auth(self, req, auth):
|
||||
old_auth_val = req.get_header('Authorization', None)
|
||||
if old_auth_val:
|
||||
old_scheme = old_auth_val.split()[0]
|
||||
if old_scheme.lower() == 'signature':
|
||||
return None
|
||||
token, challenge = auth.split(' ', 1)
|
||||
chal = parse_keqv_list(filter(None, parse_http_list(challenge)))
|
||||
auth = self.get_authorization(req, chal)
|
||||
if auth:
|
||||
auth_val = 'Signature %s' % auth
|
||||
req.add_unredirected_header('Authorization', auth_val)
|
||||
return self.parent.open(req, timeout=req.timeout)
|
||||
|
||||
def http_error_401(self, req, fp, code, msg, headers):
|
||||
authreq = headers.get('www-authenticate', None)
|
||||
if authreq:
|
||||
scheme = authreq.split()[0]
|
||||
if scheme.lower() == 'signature':
|
||||
return self.retry_http_signature_auth(req, authreq)
|
||||
raise ValueError("OscHTTPSignatureAuthHandler does not support"
|
||||
" the following scheme: '%s'" % scheme)
|
||||
|
||||
def sshkey_known(self):
|
||||
return self.sshkey is not None
|
||||
|
||||
|
||||
if 'last_opener' not in _build_opener.__dict__:
|
||||
_build_opener.last_opener = (None, None)
|
||||
if apiurl == _build_opener.last_opener[0]:
|
||||
return _build_opener.last_opener[1]
|
||||
|
||||
# respect no_proxy env variable
|
||||
if proxy_bypass(apiurl):
|
||||
# initialize with empty dict
|
||||
proxyhandler = ProxyHandler({})
|
||||
else:
|
||||
# read proxies from env
|
||||
proxyhandler = ProxyHandler()
|
||||
|
||||
options = config['api_host_options'][apiurl]
|
||||
signatureauthhandler = OscHTTPSignatureAuthHandler(options['user'], options['sshkey'])
|
||||
# with None as first argument, it will always use this username/password
|
||||
# combination for urls for which arg2 (apisrv) is a super-url
|
||||
authhandler = OscHTTPAuthHandler(HTTPPasswordMgrWithDefaultRealm(), signatureauthhandler)
|
||||
authhandler.add_password(None, apiurl, options['user'], options['pass'])
|
||||
|
||||
if options['sslcertck']:
|
||||
try:
|
||||
from . import oscssl
|
||||
from M2Crypto import m2urllib2
|
||||
except ImportError as e:
|
||||
print(e)
|
||||
raise NoSecureSSLError('M2Crypto is needed to access %s in a secure way.\nPlease install python-m2crypto.' % apiurl)
|
||||
|
||||
cafile = options.get('cafile', None)
|
||||
capath = options.get('capath', None)
|
||||
if not cafile and not capath:
|
||||
for i in ['/etc/pki/tls/cert.pem', '/etc/ssl/certs']:
|
||||
if os.path.isfile(i):
|
||||
cafile = i
|
||||
break
|
||||
elif os.path.isdir(i):
|
||||
capath = i
|
||||
break
|
||||
if not cafile and not capath:
|
||||
raise oscerr.OscIOError(None, 'No CA certificates found. (You may want to install ca-certificates-mozilla package)')
|
||||
ctx = oscssl.mySSLContext()
|
||||
if ctx.load_verify_locations(capath=capath, cafile=cafile) != 1:
|
||||
raise oscerr.OscIOError(None, 'No CA certificates found. (You may want to install ca-certificates-mozilla package)')
|
||||
opener = m2urllib2.build_opener(ctx, oscssl.myHTTPSHandler(ssl_context=ctx, appname='osc'), HTTPCookieProcessor(cookiejar), authhandler, proxyhandler)
|
||||
else:
|
||||
handlers = [HTTPCookieProcessor(cookiejar), authhandler, proxyhandler]
|
||||
try:
|
||||
# disable ssl cert check in python >= 2.7.9
|
||||
ctx = ssl._create_unverified_context()
|
||||
handlers.append(HTTPSHandler(context=ctx))
|
||||
except AttributeError:
|
||||
pass
|
||||
print("WARNING: SSL certificate checks disabled. Connection is insecure!\n", file=sys.stderr)
|
||||
opener = build_opener(*handlers)
|
||||
opener.addheaders = [('User-agent', 'osc/%s' % __version__)]
|
||||
_build_opener.last_opener = (apiurl, opener)
|
||||
return opener
|
||||
|
||||
|
||||
def init_basicauth(config, config_mtime):
|
||||
"""initialize urllib2 with the credentials for Basic Authentication"""
|
||||
|
||||
def filterhdrs(meth, ishdr, *hdrs):
|
||||
# this is so ugly but httplib doesn't use
|
||||
# a logger object or such
|
||||
def new_method(self, *args, **kwargs):
|
||||
# check if this is a recursive call (note: we do not
|
||||
# have to care about thread safety)
|
||||
is_rec_call = getattr(self, '_orig_stdout', None) is not None
|
||||
try:
|
||||
if not is_rec_call:
|
||||
self._orig_stdout = sys.stdout
|
||||
sys.stdout = StringIO()
|
||||
meth(self, *args, **kwargs)
|
||||
hdr = sys.stdout.getvalue()
|
||||
finally:
|
||||
# restore original stdout
|
||||
if not is_rec_call:
|
||||
sys.stdout = self._orig_stdout
|
||||
del self._orig_stdout
|
||||
for i in hdrs:
|
||||
if ishdr:
|
||||
hdr = re.sub(r'%s:[^\\r]*\\r\\n' % i, '', hdr)
|
||||
else:
|
||||
hdr = re.sub(i, '', hdr)
|
||||
sys.stdout.write(hdr)
|
||||
new_method.__name__ = meth.__name__
|
||||
return new_method
|
||||
|
||||
if config['http_debug'] and not config['http_full_debug']:
|
||||
HTTPConnection.send = filterhdrs(HTTPConnection.send, True, 'Cookie', 'Authorization')
|
||||
HTTPResponse.begin = filterhdrs(HTTPResponse.begin, False, 'header: Set-Cookie.*\n')
|
||||
|
||||
if config['http_debug']:
|
||||
# brute force
|
||||
def urllib2_debug_init(self, debuglevel=0):
|
||||
self._debuglevel = 1
|
||||
AbstractHTTPHandler.__init__ = urllib2_debug_init
|
||||
|
||||
cookie_file = os.path.expanduser(config['cookiejar'])
|
||||
if not os.path.exists(os.path.dirname(cookie_file)):
|
||||
os.makedirs(os.path.dirname(cookie_file), mode=0o700)
|
||||
global cookiejar
|
||||
cookiejar = LWPCookieJar(cookie_file)
|
||||
try:
|
||||
cookiejar.load(ignore_discard=True)
|
||||
if int(round(config_mtime)) > int(os.stat(cookie_file).st_mtime):
|
||||
cookiejar.clear()
|
||||
cookiejar.save()
|
||||
except IOError:
|
||||
try:
|
||||
fd = os.open(cookie_file, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o600)
|
||||
os.close(fd)
|
||||
except IOError:
|
||||
# hmm is any good reason why we should catch the IOError?
|
||||
#print 'Unable to create cookiejar file: \'%s\'. Using RAM-based cookies.' % cookie_file
|
||||
cookiejar = CookieJar()
|
||||
|
||||
|
||||
def get_configParser(conffile=None, force_read=False):
|
||||
"""
|
||||
Returns an ConfigParser() object. After its first invocation the
|
||||
@ -997,7 +645,7 @@ def write_initial_config(conffile, entries, custom_template='', creds_mgr_descri
|
||||
write_config(conffile, cp)
|
||||
|
||||
|
||||
def add_section(filename, url, user, passwd, creds_mgr_descriptor=None):
|
||||
def add_section(filename, url, user, passwd, creds_mgr_descriptor=None, allow_http=None):
|
||||
"""
|
||||
Add a section to config file for new api url.
|
||||
"""
|
||||
@ -1014,6 +662,8 @@ def add_section(filename, url, user, passwd, creds_mgr_descriptor=None):
|
||||
else:
|
||||
creds_mgr = _get_credentials_manager(url, cp)
|
||||
creds_mgr.set_password(url, user, passwd)
|
||||
if allow_http:
|
||||
cp.set(url, 'allow_http', "1")
|
||||
write_config(filename, cp)
|
||||
|
||||
|
||||
@ -1160,10 +810,10 @@ def get_config(override_conffile=None,
|
||||
'http_headers': http_headers}
|
||||
api_host_options[apiurl] = APIHostOptionsEntry(entry)
|
||||
|
||||
optional = ('realname', 'email', 'sslcertck', 'cafile', 'capath', 'sshkey')
|
||||
optional = ('realname', 'email', 'sslcertck', 'cafile', 'capath', 'sshkey', 'allow_http')
|
||||
for key in optional:
|
||||
if cp.has_option(url, key):
|
||||
if key == 'sslcertck':
|
||||
if key in ('sslcertck', 'allow_http'):
|
||||
api_host_options[apiurl][key] = cp.getboolean(url, key)
|
||||
else:
|
||||
api_host_options[apiurl][key] = cp.get(url, key)
|
||||
@ -1173,8 +823,8 @@ def get_config(override_conffile=None,
|
||||
if not 'sslcertck' in api_host_options[apiurl]:
|
||||
api_host_options[apiurl]['sslcertck'] = True
|
||||
|
||||
if scheme == 'http':
|
||||
api_host_options[apiurl]['sslcertck'] = False
|
||||
if 'allow_http' not in api_host_options[apiurl]:
|
||||
api_host_options[apiurl]['allow_http'] = False
|
||||
|
||||
if cp.has_option(url, 'trusted_prj'):
|
||||
api_host_options[apiurl]['trusted_prj'] = cp.get(url, 'trusted_prj').split(' ')
|
||||
@ -1246,8 +896,17 @@ def get_config(override_conffile=None,
|
||||
e.file = conffile
|
||||
raise e
|
||||
|
||||
# finally, initialize urllib2 for to use the credentials for Basic Authentication
|
||||
init_basicauth(config, os.stat(conffile).st_mtime)
|
||||
scheme = urlsplit(apiurl)[0]
|
||||
if scheme == "http" and not api_host_options[apiurl]['allow_http']:
|
||||
msg = "The apiurl '{apiurl}' uses HTTP protocol without any encryption.\n"
|
||||
msg += "All communication incl. sending your password IS NOT ENCRYPTED!\n"
|
||||
msg += "Add 'allow_http=1' to the [{apiurl}] config file section to mute this message.\n"
|
||||
print(msg.format(apiurl=apiurl), file=sys.stderr)
|
||||
|
||||
# enable connection debugging after all config options are set
|
||||
from .connection import enable_http_debug
|
||||
enable_http_debug(config)
|
||||
|
||||
|
||||
def identify_conf():
|
||||
# needed for compat reasons(users may have their oscrc still in ~
|
||||
@ -1264,6 +923,18 @@ def identify_conf():
|
||||
return conffile
|
||||
|
||||
def interactive_config_setup(conffile, apiurl, initial=True):
|
||||
scheme = urlsplit(apiurl)[0]
|
||||
http = scheme == "http"
|
||||
if http:
|
||||
msg = "The apiurl '{apiurl}' uses HTTP protocol without any encryption.\n"
|
||||
msg += "All communication incl. sending your password WILL NOT BE ENCRYPTED!\n"
|
||||
msg += "Do you really want to continue with no encryption?\n"
|
||||
print(msg.format(apiurl=apiurl), file=sys.stderr)
|
||||
yes = raw_input("Type 'YES' to continue: ")
|
||||
if yes != "YES":
|
||||
raise oscerr.UserAbort()
|
||||
print()
|
||||
|
||||
user = raw_input('Username: ')
|
||||
passwd = getpass.getpass()
|
||||
creds_mgr_descr = select_credentials_manager_descr()
|
||||
@ -1271,9 +942,11 @@ def interactive_config_setup(conffile, apiurl, initial=True):
|
||||
config = {'user': user, 'pass': passwd}
|
||||
if apiurl:
|
||||
config['apiurl'] = apiurl
|
||||
if http:
|
||||
config['allow_http'] = 1
|
||||
write_initial_config(conffile, config, creds_mgr_descriptor=creds_mgr_descr)
|
||||
else:
|
||||
add_section(conffile, apiurl, user, passwd, creds_mgr_descriptor=creds_mgr_descr)
|
||||
add_section(conffile, apiurl, user, passwd, creds_mgr_descriptor=creds_mgr_descr, allow_http=http)
|
||||
|
||||
def select_credentials_manager_descr():
|
||||
if not credentials.has_keyring_support():
|
||||
|
613
osc/connection.py
Normal file
613
osc/connection.py
Normal file
@ -0,0 +1,613 @@
|
||||
import base64
|
||||
import errno
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
|
||||
import http.client
|
||||
import http.cookiejar
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import urllib3.exceptions
|
||||
import urllib3.poolmanager
|
||||
import urllib3.response
|
||||
import urllib3.util
|
||||
|
||||
from . import conf
|
||||
from . import oscerr
|
||||
from . import oscssl
|
||||
from .util.helper import decode_it
|
||||
|
||||
|
||||
class MockRequest:
|
||||
"""
|
||||
Mock a request object for `cookiejar.extract_cookies()`
|
||||
and `cookiejar.add_cookie_header()`.
|
||||
"""
|
||||
|
||||
def __init__(self, url, headers):
|
||||
self.url = url
|
||||
self.headers = headers
|
||||
self.unverifiable = False
|
||||
self.type = "https"
|
||||
|
||||
def get_full_url(self):
|
||||
return self.url
|
||||
|
||||
def get_header(self, header_name, default=None):
|
||||
return self.headers.get(header_name, default)
|
||||
|
||||
def has_header(self, header_name):
|
||||
return (header_name in self.headers)
|
||||
|
||||
def add_unredirected_header(self, key, val):
|
||||
# modifies the `headers` variable that was passed to object's constructor
|
||||
self.headers[key] = val
|
||||
|
||||
|
||||
def enable_http_debug(config):
|
||||
if not int(config["http_debug"]) and not int(config["http_full_debug"]):
|
||||
return
|
||||
|
||||
# HACK: override HTTPResponse's init to increase debug level
|
||||
old_HTTPResponse__init__ = http.client.HTTPResponse.__init__
|
||||
|
||||
def new_HTTPResponse__init__(self, *args, **kwargs):
|
||||
old_HTTPResponse__init__(self, *args, **kwargs)
|
||||
self.debuglevel = 1
|
||||
http.client.HTTPResponse.__init__ = new_HTTPResponse__init__
|
||||
|
||||
# increase HTTPConnection debug level
|
||||
http.client.HTTPConnection.debuglevel = 1
|
||||
|
||||
# HACK: because HTTPResponse's debug data uses print(),
|
||||
# let's inject custom print() function to that module
|
||||
def new_print(*args, file=None):
|
||||
if not int(config["http_full_debug"]) and args:
|
||||
# hide private data (authorization and cookies) when full debug is not enabled
|
||||
if args[:2] == ("header:", "Set-Cookie:"):
|
||||
return
|
||||
if args[0] == "send:":
|
||||
args = list(args)
|
||||
# (?<=...) - '...' must be present before the pattern (positive lookbehind assertion)
|
||||
args[1] = re.sub(r"(?<=\\r\\n)authorization:.*?\\r\\n", "", args[1], re.I)
|
||||
args[1] = re.sub(r"(?<=\\r\\n)Cookie:.*?\\r\\n", "", args[1], re.I)
|
||||
print("DEBUG:", *args, file=sys.stderr)
|
||||
http.client.print = new_print
|
||||
|
||||
|
||||
def get_proxy_manager(env):
|
||||
proxy_url = os.environ.get(env, None)
|
||||
|
||||
if not proxy_url:
|
||||
return
|
||||
|
||||
proxy_purl = urllib3.util.parse_url(proxy_url)
|
||||
|
||||
# rebuild proxy url in order to remove auth because ProxyManager would fail on it
|
||||
if proxy_purl.port:
|
||||
proxy_url = f"{proxy_purl.scheme}://{proxy_purl.host}:{proxy_purl.port}"
|
||||
else:
|
||||
proxy_url = f"{proxy_purl.scheme}://{proxy_purl.host}"
|
||||
|
||||
# import osc.core here to avoid cyclic imports
|
||||
from . import core
|
||||
|
||||
proxy_headers = urllib3.make_headers(
|
||||
proxy_basic_auth=proxy_purl.auth,
|
||||
user_agent=f"osc/{core.__version__}",
|
||||
)
|
||||
|
||||
manager = urllib3.ProxyManager(proxy_url, proxy_headers=proxy_headers)
|
||||
return manager
|
||||
|
||||
|
||||
# Instantiate on first use in `http_request()`.
|
||||
# Each `apiurl` requires a differently configured pool
|
||||
# (incl. trusted keys for example).
|
||||
CONNECTION_POOLS = {}
|
||||
|
||||
|
||||
# Pool manager for requests outside apiurls.
|
||||
POOL_MANAGER = urllib3.PoolManager()
|
||||
|
||||
|
||||
# Proxy manager for HTTP connections.
|
||||
HTTP_PROXY_MANAGER = get_proxy_manager("HTTP_PROXY")
|
||||
|
||||
|
||||
# Proxy manager for HTTPS connections.
|
||||
HTTPS_PROXY_MANAGER = get_proxy_manager("HTTPS_PROXY")
|
||||
|
||||
|
||||
def http_request(method, url, headers=None, data=None, file=None):
|
||||
"""
|
||||
Send a HTTP request to a server.
|
||||
|
||||
Features:
|
||||
* Authentication ([apiurl]/{user,pass} in oscrc)
|
||||
* Session cookie support (~/.local/state/osc/cookiejar)
|
||||
* SSL certificate verification incl. managing trusted certs
|
||||
* SSL certificate verification bypass (if [apiurl]/sslcertck=0 in oscrc)
|
||||
* Expired SSL certificates are no longer accepted. Either prolong them or set sslcertck=0.
|
||||
* Proxy support (HTTPS_PROXY env, NO_PROXY is respected)
|
||||
* Retries (http_retries in oscrc)
|
||||
* Requests outside apiurl (incl. proxy support)
|
||||
* Connection debugging (-H/--http-debug, --http-full-debug)
|
||||
|
||||
:param method: HTTP request method (such as GET, POST, PUT, DELETE).
|
||||
:param url: The URL to perform the request on.
|
||||
:param headers: Dictionary of custom headers to send.
|
||||
:param data: Data to send in the request body (conflicts with `file`).
|
||||
:param file: Path to a file to send as data in the request body (conflicts with `data`).
|
||||
"""
|
||||
|
||||
# import osc.core here to avoid cyclic imports
|
||||
from . import core
|
||||
|
||||
purl = urllib3.util.parse_url(url)
|
||||
apiurl = conf.extract_known_apiurl(url)
|
||||
headers = urllib3.response.HTTPHeaderDict(headers or {})
|
||||
|
||||
# identify osc
|
||||
headers.update(urllib3.make_headers(user_agent=f"osc/{core.__version__}"))
|
||||
|
||||
if data and file:
|
||||
raise RuntimeError('Specify either `data` or `file`')
|
||||
elif data:
|
||||
if hasattr(data, "encode"):
|
||||
data = data.encode("utf-8")
|
||||
content_length = len(data)
|
||||
elif file:
|
||||
content_length = os.path.getsize(file)
|
||||
data = open(file, "rb")
|
||||
else:
|
||||
content_length = 0
|
||||
|
||||
if content_length:
|
||||
headers.add("Content-Length", str(content_length))
|
||||
|
||||
# handle requests that go outside apiurl
|
||||
# do not set auth cookie or auth credentials
|
||||
if not apiurl:
|
||||
if purl.scheme == "http" and HTTP_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
|
||||
# connection through proxy
|
||||
manager = HTTP_PROXY_MANAGER
|
||||
elif purl.scheme == "https" and HTTPS_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
|
||||
# connection through proxy
|
||||
manager = HTTPS_PROXY_MANAGER
|
||||
else:
|
||||
# direct connection
|
||||
manager = POOL_MANAGER
|
||||
|
||||
response = manager.urlopen(method, url, body=data, headers=headers, preload_content=False)
|
||||
|
||||
if response.status / 100 != 2:
|
||||
raise urllib.error.HTTPError(url, response.status, response.reason, response.headers, response)
|
||||
|
||||
return response
|
||||
|
||||
options = conf.config["api_host_options"][apiurl]
|
||||
|
||||
global CONNECTION_POOLS
|
||||
pool = CONNECTION_POOLS.get(apiurl, None)
|
||||
if not pool:
|
||||
pool_kwargs = {}
|
||||
pool_kwargs["retries"] = int(conf.config["http_retries"])
|
||||
|
||||
if purl.scheme == "https":
|
||||
ssl_context = oscssl.create_ssl_context()
|
||||
ssl_context.load_default_certs()
|
||||
# turn cert verification off if sslcertck = 0
|
||||
pool_kwargs["cert_reqs"] = "CERT_REQUIRED" if options["sslcertck"] else "CERT_NONE"
|
||||
pool_kwargs["ssl_context"] = ssl_context
|
||||
|
||||
if purl.scheme == "http" and HTTP_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
|
||||
# connection through HTTP proxy
|
||||
pool = HTTP_PROXY_MANAGER.connection_from_host(
|
||||
host=purl.host,
|
||||
port=purl.port,
|
||||
scheme=purl.scheme,
|
||||
pool_kwargs=pool_kwargs
|
||||
)
|
||||
HTTP_PROXY_MANAGER.request('GET', url)
|
||||
elif purl.scheme == "https" and HTTPS_PROXY_MANAGER and not urllib.request.proxy_bypass(url):
|
||||
# connection through HTTPS proxy
|
||||
pool = HTTPS_PROXY_MANAGER.connection_from_host(
|
||||
host=purl.host,
|
||||
port=purl.port,
|
||||
scheme=purl.scheme,
|
||||
pool_kwargs=pool_kwargs
|
||||
)
|
||||
elif purl.scheme == "https":
|
||||
# direct connection
|
||||
pool = urllib3.HTTPSConnectionPool(host=purl.host, port=purl.port, **pool_kwargs)
|
||||
else:
|
||||
pool = urllib3.HTTPConnectionPool(host=purl.host, port=purl.port, **pool_kwargs)
|
||||
|
||||
if purl.scheme == "https":
|
||||
# inject ssl context instance into pool so we can use it later
|
||||
pool.ssl_context = ssl_context
|
||||
|
||||
# inject trusted cert store instance into pool so we can use it later
|
||||
pool.trusted_cert_store = oscssl.TrustedCertStore(ssl_context, purl.host, purl.port)
|
||||
|
||||
CONNECTION_POOLS[apiurl] = pool
|
||||
|
||||
auth_handlers = [
|
||||
CookieJarAuthHandler(os.path.expanduser(conf.config["cookiejar"])),
|
||||
SignatureAuthHandler(options["user"], options["sshkey"], options["pass"]),
|
||||
BasicAuthHandler(options["user"], options["pass"]),
|
||||
]
|
||||
|
||||
for handler in auth_handlers:
|
||||
# authenticate using a cookie (if available)
|
||||
success = handler.set_request_headers(url, headers)
|
||||
if success:
|
||||
break
|
||||
|
||||
if data or file:
|
||||
# osc/obs data is usually XML
|
||||
headers.add("Content-Type", "application/xml; charset=utf-8")
|
||||
|
||||
if purl.scheme == "http" and HTTP_PROXY_MANAGER:
|
||||
# HTTP proxy requires full URL with 'same host' checking off
|
||||
urlopen_url = url
|
||||
assert_same_host = False
|
||||
else:
|
||||
# everything else is fine with path only
|
||||
# join path and query, ignore the remaining args; args are (scheme, netloc, path, query, fragment)
|
||||
urlopen_url = urllib.parse.urlunsplit(("", "", purl.path, purl.query, ""))
|
||||
assert_same_host = True
|
||||
|
||||
if int(conf.config['http_debug']):
|
||||
# use the hacked print() for consistency
|
||||
http.client.print(40 * '-')
|
||||
http.client.print(method, url)
|
||||
|
||||
try:
|
||||
response = pool.urlopen(
|
||||
method, urlopen_url, body=data, headers=headers,
|
||||
preload_content=False, assert_same_host=assert_same_host
|
||||
)
|
||||
except urllib3.exceptions.MaxRetryError as e:
|
||||
if not isinstance(e.reason, urllib3.exceptions.SSLError):
|
||||
# re-raise exceptions that are not related to SSL
|
||||
raise
|
||||
|
||||
if isinstance(e.reason.args[0], ssl.SSLCertVerificationError):
|
||||
self_signed_verify_codes = (
|
||||
oscssl.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
|
||||
oscssl.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
|
||||
)
|
||||
if e.reason.args[0].verify_code not in self_signed_verify_codes:
|
||||
# re-raise ssl exceptions that are not related to self-signed certs
|
||||
raise e.reason.args[0] from None
|
||||
else:
|
||||
# re-raise other than ssl exceptions
|
||||
raise e.reason.args[0] from None
|
||||
|
||||
# get the untrusted certificated from server
|
||||
cert = pool.trusted_cert_store.get_server_certificate()
|
||||
|
||||
# prompt user if we should trust the certificate
|
||||
pool.trusted_cert_store.prompt_trust(cert, reason=e.reason)
|
||||
|
||||
response = pool.urlopen(
|
||||
method, urlopen_url, body=data, headers=headers,
|
||||
preload_content=False, assert_same_host=assert_same_host
|
||||
)
|
||||
|
||||
if response.status == 401:
|
||||
# session cookie has expired, re-authenticate
|
||||
for handler in auth_handlers:
|
||||
success = handler.set_request_headers_after_401(url, headers, response)
|
||||
if success:
|
||||
break
|
||||
response = pool.urlopen(method, urlopen_url, body=data, headers=headers, preload_content=False)
|
||||
|
||||
if response.status / 100 != 2:
|
||||
raise urllib.error.HTTPError(url, response.status, response.reason, response.headers, response)
|
||||
|
||||
for handler in auth_handlers:
|
||||
handler.process_response(url, headers, response)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
# pylint: disable=C0103,C0116
|
||||
def http_GET(*args, **kwargs):
|
||||
return http_request("GET", *args, **kwargs)
|
||||
|
||||
|
||||
# pylint: disable=C0103,C0116
|
||||
def http_POST(*args, **kwargs):
|
||||
return http_request("POST", *args, **kwargs)
|
||||
|
||||
|
||||
# pylint: disable=C0103,C0116
|
||||
def http_PUT(*args, **kwargs):
|
||||
return http_request("PUT", *args, **kwargs)
|
||||
|
||||
|
||||
# pylint: disable=C0103,C0116
|
||||
def http_DELETE(*args, **kwargs):
|
||||
return http_request("DELETE", *args, **kwargs)
|
||||
|
||||
|
||||
class AuthHandlerBase:
|
||||
def _get_auth_schemes(self, response):
|
||||
"""
|
||||
Extract all `www-authenticate` headers from `response` and return them
|
||||
in a dictionary: `{scheme: auth_method}`.
|
||||
"""
|
||||
result = {}
|
||||
for auth_method in response.headers.get_all("www-authenticate", []):
|
||||
scheme = auth_method.split()[0].lower()
|
||||
result[scheme] = auth_method
|
||||
return result
|
||||
|
||||
def set_request_headers(self, url, request_headers):
|
||||
"""
|
||||
Modify request headers with auth headers.
|
||||
|
||||
:param url: Request URL provides context for `request_headers` modifications
|
||||
:type url: str
|
||||
:param request_headers: object to be modified
|
||||
:type request_headers: urllib3.response.HTTPHeaderDict
|
||||
:return: `True` on if `request_headers` was modified, `False` otherwise
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_request_headers_after_401(self, url, request_headers, response):
|
||||
"""
|
||||
Modify request headers with auth headers after getting 401 response.
|
||||
|
||||
:param url: Request URL provides context for `request_headers` modifications
|
||||
:type url: str
|
||||
:param request_headers: object to be modified
|
||||
:type request_headers: urllib3.response.HTTPHeaderDict
|
||||
:param response: Response object provides context for `request_headers` modifications
|
||||
:type response: urllib3.response.HTTPResponse
|
||||
:return: `True` on if `request_headers` was modified, `False` otherwise
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def process_response(self, url, request_headers, response):
|
||||
"""
|
||||
Retrieve data from response, save cookies, etc.
|
||||
|
||||
:param url: Request URL provides context for `request_headers` modifications
|
||||
:type url: str
|
||||
:param request_headers: object to be modified
|
||||
:type request_headers: urllib3.response.HTTPHeaderDict
|
||||
:param response: Response object provides context for `request_headers` modifications
|
||||
:type response: urllib3.response.HTTPResponse
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class CookieJarAuthHandler(AuthHandlerBase):
|
||||
# Shared among instances, instantiate on first use, key equals too cookiejar path.
|
||||
COOKIEJARS = {}
|
||||
|
||||
def __init__(self, cookiejar_path):
|
||||
self.cookiejar_path = cookiejar_path
|
||||
|
||||
@property
|
||||
def _cookiejar(self):
|
||||
jar = self.COOKIEJARS.get(self.cookiejar_path, None)
|
||||
if not jar:
|
||||
try:
|
||||
os.makedirs(os.path.dirname(self.cookiejar_path), mode=0o700)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise
|
||||
jar = http.cookiejar.LWPCookieJar(self.cookiejar_path)
|
||||
if os.path.isfile(self.cookiejar_path):
|
||||
jar.load()
|
||||
self.COOKIEJARS[self.cookiejar_path] = jar
|
||||
return jar
|
||||
|
||||
def set_request_headers(self, url, request_headers):
|
||||
self._cookiejar.add_cookie_header(MockRequest(url, request_headers))
|
||||
return bool(request_headers.get_all("cookie", None))
|
||||
|
||||
def set_request_headers_after_401(self, url, request_headers, response):
|
||||
# can't do anything, we have tried setting a cookie already
|
||||
return False
|
||||
|
||||
def process_response(self, url, request_headers, response):
|
||||
self._cookiejar.extract_cookies(response, MockRequest(url, response.headers))
|
||||
self._cookiejar.save()
|
||||
|
||||
|
||||
class BasicAuthHandler(AuthHandlerBase):
|
||||
def __init__(self, user, password):
|
||||
self.user = user
|
||||
self.password = password
|
||||
|
||||
def set_request_headers(self, url, request_headers):
|
||||
return False
|
||||
|
||||
def set_request_headers_after_401(self, url, request_headers, response):
|
||||
auth_schemes = self._get_auth_schemes(response)
|
||||
if "basic" not in auth_schemes:
|
||||
return False
|
||||
if not self.user or not self.password:
|
||||
return False
|
||||
request_headers.update(urllib3.make_headers(basic_auth=f"{self.user}:{self.password}"))
|
||||
return True
|
||||
|
||||
def process_response(self, url, request_headers, response):
|
||||
pass
|
||||
|
||||
|
||||
class SignatureAuthHandler(AuthHandlerBase):
|
||||
def __init__(self, user, sshkey, basic_auth_password=None):
|
||||
self.user = user
|
||||
self.sshkey = sshkey
|
||||
# value of `basic_auth_password` is only used as a hint if we should skip signature auth
|
||||
self.basic_auth_password = bool(basic_auth_password)
|
||||
|
||||
def list_ssh_agent_keys(self):
|
||||
cmd = ['ssh-add', '-l']
|
||||
try:
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
except OSError:
|
||||
# ssh-add is not available
|
||||
return []
|
||||
stdout, _ = proc.communicate()
|
||||
if proc.returncode == 0 and stdout.strip():
|
||||
return [self.get_fingerprint(line) for line in stdout.splitlines()]
|
||||
else:
|
||||
return []
|
||||
|
||||
def is_ssh_private_keyfile(self, keyfile_path):
|
||||
if not os.path.isfile(keyfile_path):
|
||||
return False
|
||||
with open(keyfile_path, "r") as f:
|
||||
try:
|
||||
line = f.readline(100).strip()
|
||||
except UnicodeDecodeError:
|
||||
# skip binary files
|
||||
return False
|
||||
if line == "-----BEGIN RSA PRIVATE KEY-----":
|
||||
return True
|
||||
if line == "-----BEGIN OPENSSH PRIVATE KEY-----":
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_ssh_public_keyfile(self, keyfile_path):
|
||||
if not os.path.isfile(keyfile_path):
|
||||
return False
|
||||
return keyfile_path.endswith(".pub")
|
||||
|
||||
@staticmethod
|
||||
def get_fingerprint(line):
|
||||
parts = line.strip().split(b" ")
|
||||
if len(parts) < 2:
|
||||
raise ValueError("Unable to retrieve ssh key fingerprint from line: {}".format(line))
|
||||
return parts[1]
|
||||
|
||||
def list_ssh_dir_keys(self):
|
||||
sshdir = os.path.expanduser('~/.ssh')
|
||||
keys_in_home_ssh = {}
|
||||
for keyfile in os.listdir(sshdir):
|
||||
if keyfile.startswith(("agent-", "authorized_keys", "config", "known_hosts")):
|
||||
# skip files that definitely don't contain keys
|
||||
continue
|
||||
|
||||
keyfile_path = os.path.join(sshdir, keyfile)
|
||||
# public key alone may be sufficient because the private key
|
||||
# can get loaded into ssh-agent from gpg (yubikey works this way)
|
||||
is_public = self.is_ssh_public_keyfile(keyfile_path)
|
||||
# skip private detection if we think the key is a public one already
|
||||
is_private = False if is_public else self.is_ssh_private_keyfile(keyfile_path)
|
||||
|
||||
if not is_public and not is_private:
|
||||
continue
|
||||
|
||||
cmd = ["ssh-keygen", "-lf", keyfile_path]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
fingerprint = self.get_fingerprint(stdout)
|
||||
if fingerprint and (fingerprint not in keys_in_home_ssh or is_private):
|
||||
# prefer path to a private key
|
||||
keys_in_home_ssh[fingerprint] = keyfile_path
|
||||
return keys_in_home_ssh
|
||||
|
||||
def guess_keyfile(self):
|
||||
keys_in_agent = self.list_ssh_agent_keys()
|
||||
if keys_in_agent:
|
||||
keys_in_home_ssh = self.list_ssh_dir_keys()
|
||||
for fingerprint in keys_in_agent:
|
||||
if fingerprint in keys_in_home_ssh:
|
||||
return keys_in_home_ssh[fingerprint]
|
||||
sshdir = os.path.expanduser('~/.ssh')
|
||||
keyfiles = ('id_ed25519', 'id_ed25519_sk', 'id_rsa', 'id_ecdsa', 'id_ecdsa_sk', 'id_dsa')
|
||||
for keyfile in keyfiles:
|
||||
keyfile_path = os.path.join(sshdir, keyfile)
|
||||
if os.path.isfile(keyfile_path):
|
||||
return keyfile_path
|
||||
return None
|
||||
|
||||
def ssh_sign(self, data, namespace, keyfile=None):
|
||||
try:
|
||||
data = bytes(data, 'utf-8')
|
||||
except:
|
||||
pass
|
||||
if not keyfile:
|
||||
keyfile = self.guess_keyfile()
|
||||
else:
|
||||
if '/' not in keyfile:
|
||||
keyfile = '~/.ssh/' + keyfile
|
||||
keyfile = os.path.expanduser(keyfile)
|
||||
|
||||
cmd = ['ssh-keygen', '-Y', 'sign', '-f', keyfile, '-n', namespace, '-q']
|
||||
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate(data)
|
||||
if proc.returncode:
|
||||
raise oscerr.OscIOError(None, 'ssh-keygen signature creation failed: %d' % proc.returncode)
|
||||
|
||||
signature = decode_it(stdout)
|
||||
match = re.match(r"\A-----BEGIN SSH SIGNATURE-----\n(.*)\n-----END SSH SIGNATURE-----", signature, re.S)
|
||||
if not match:
|
||||
raise oscerr.OscIOError(None, 'could not extract ssh signature')
|
||||
return base64.b64decode(match.group(1))
|
||||
|
||||
def get_authorization(self, chal):
|
||||
realm = chal.get('realm', '')
|
||||
now = int(time.time())
|
||||
sigdata = "(created): %d" % now
|
||||
signature = self.ssh_sign(sigdata, realm, self.sshkey)
|
||||
signature = decode_it(base64.b64encode(signature))
|
||||
return 'keyId="%s",algorithm="ssh",headers="(created)",created=%d,signature="%s"' \
|
||||
% (self.user, now, signature)
|
||||
|
||||
def add_signature_auth_header(self, req, auth):
|
||||
token, challenge = auth.split(' ', 1)
|
||||
chal = urllib.request.parse_keqv_list(filter(None, urllib.request.parse_http_list(challenge)))
|
||||
auth = self.get_authorization(chal)
|
||||
if not auth:
|
||||
return False
|
||||
auth_val = 'Signature %s' % auth
|
||||
req.add('Authorization', auth_val)
|
||||
return True
|
||||
|
||||
def set_request_headers(self, url, request_headers):
|
||||
return False
|
||||
|
||||
def set_request_headers_after_401(self, url, request_headers, response):
|
||||
auth_schemes = self._get_auth_schemes(response)
|
||||
|
||||
if "signature" not in auth_schemes:
|
||||
# unsupported on server
|
||||
return False
|
||||
|
||||
if not self.user:
|
||||
return False
|
||||
|
||||
if self.basic_auth_password and "basic" in auth_schemes:
|
||||
# prefer basic auth, but only if password is set
|
||||
return False
|
||||
|
||||
if not self.sshkey_known():
|
||||
# ssh key not set, try to guess it
|
||||
self.sshkey = self.guess_keyfile()
|
||||
|
||||
if not self.sshkey_known():
|
||||
# ssh key cannot be guessed
|
||||
return False
|
||||
|
||||
return self.add_signature_auth_header(request_headers, auth_schemes["signature"])
|
||||
|
||||
def process_response(self, url, request_headers, response):
|
||||
pass
|
||||
|
||||
def sshkey_known(self):
|
||||
return self.sshkey is not None
|
102
osc/core.py
102
osc/core.py
@ -34,21 +34,11 @@ try:
|
||||
except ImportError:
|
||||
distro = None
|
||||
|
||||
try:
|
||||
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import pathname2url, install_opener, urlopen
|
||||
from urllib.request import Request as URLRequest
|
||||
from io import StringIO
|
||||
from http.client import IncompleteRead
|
||||
except ImportError:
|
||||
#python 2.x
|
||||
from urlparse import urlsplit, urlunsplit, urlparse
|
||||
from urllib import pathname2url, quote_plus, urlencode, unquote
|
||||
from urllib2 import HTTPError, URLError, install_opener, urlopen
|
||||
from urllib2 import Request as URLRequest
|
||||
from cStringIO import StringIO
|
||||
from httplib import IncompleteRead
|
||||
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
|
||||
from urllib.error import HTTPError
|
||||
from urllib.request import pathname2url
|
||||
from io import StringIO
|
||||
from http.client import IncompleteRead
|
||||
|
||||
try:
|
||||
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
|
||||
@ -60,6 +50,7 @@ except ImportError:
|
||||
|
||||
from . import oscerr
|
||||
from . import conf
|
||||
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
|
||||
|
||||
try:
|
||||
from functools import cmp_to_key
|
||||
@ -3382,87 +3373,6 @@ def makeurl(baseurl, l, query=[]):
|
||||
return urlunsplit((scheme, netloc, '/'.join([path] + list(l)), query, ''))
|
||||
|
||||
|
||||
def http_request(method, url, headers={}, data=None, file=None):
|
||||
"""wrapper around urllib2.urlopen for error handling,
|
||||
and to support additional (PUT, DELETE) methods"""
|
||||
class DataContext:
|
||||
"""Wrap a data value (or None) in a context manager."""
|
||||
|
||||
def __init__(self, data):
|
||||
self._data = data
|
||||
|
||||
def __enter__(self):
|
||||
return self._data
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
return None
|
||||
|
||||
|
||||
if file is not None and data is not None:
|
||||
raise RuntimeError('file and data are mutually exclusive')
|
||||
|
||||
if conf.config['http_debug']:
|
||||
print('\n\n--', method, url, file=sys.stderr)
|
||||
|
||||
if method == 'POST' and not file and not data:
|
||||
# adding data to an urllib2 request transforms it into a POST
|
||||
data = b''
|
||||
|
||||
req = URLRequest(url)
|
||||
api_host_options = {}
|
||||
apiurl = conf.extract_known_apiurl(url)
|
||||
if apiurl is not None:
|
||||
# ok no external request
|
||||
install_opener(conf._build_opener(apiurl))
|
||||
api_host_options = conf.get_apiurl_api_host_options(apiurl)
|
||||
for header, value in api_host_options['http_headers']:
|
||||
req.add_header(header, value)
|
||||
|
||||
req.get_method = lambda: method
|
||||
|
||||
# POST requests are application/x-www-form-urlencoded per default
|
||||
# but sending data requires an octet-stream type
|
||||
if method == 'PUT' or (method == 'POST' and (data or file)):
|
||||
req.add_header('Content-Type', 'application/octet-stream')
|
||||
|
||||
if isinstance(headers, type({})):
|
||||
for i in headers.keys():
|
||||
print(headers[i])
|
||||
req.add_header(i, headers[i])
|
||||
|
||||
if conf.config['debug']: print(method, url, file=sys.stderr)
|
||||
|
||||
content_length = None
|
||||
if data is not None:
|
||||
if isinstance(data, str):
|
||||
data = data.encode('utf-8')
|
||||
content_length = len(data)
|
||||
elif file is not None:
|
||||
content_length = os.path.getsize(file)
|
||||
|
||||
with (open(file, 'rb') if file is not None else DataContext(data)) as d:
|
||||
req.data = d
|
||||
if content_length is not None:
|
||||
# do this after setting req.data because the corresponding setter
|
||||
# kills an existing Content-Length header (see urllib.Request class
|
||||
# (python38))
|
||||
req.add_header('Content-Length', str(content_length))
|
||||
try:
|
||||
return urlopen(req)
|
||||
except URLError as e:
|
||||
e._osc_host_port = req.host
|
||||
raise
|
||||
finally:
|
||||
if hasattr(conf.cookiejar, 'save'):
|
||||
conf.cookiejar.save(ignore_discard=True)
|
||||
|
||||
|
||||
def http_GET(*args, **kwargs): return http_request('GET', *args, **kwargs)
|
||||
def http_POST(*args, **kwargs): return http_request('POST', *args, **kwargs)
|
||||
def http_PUT(*args, **kwargs): return http_request('PUT', *args, **kwargs)
|
||||
def http_DELETE(*args, **kwargs): return http_request('DELETE', *args, **kwargs)
|
||||
|
||||
|
||||
def check_store_version(dir):
|
||||
global store
|
||||
|
||||
|
521
osc/oscssl.py
521
osc/oscssl.py
@ -1,414 +1,161 @@
|
||||
# Copyright (C) 2009 Novell Inc.
|
||||
# This program is free software; it may be used, copied, modified
|
||||
# and distributed under the terms of the GNU General Public Licence,
|
||||
# either version 2, or (at your option) any later version.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from M2Crypto.SSL.Checker import SSLVerificationError
|
||||
from M2Crypto import m2, SSL, httpslib
|
||||
import M2Crypto.m2urllib2
|
||||
import binascii
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import socket
|
||||
import ssl
|
||||
import sys
|
||||
import inspect
|
||||
import tempfile
|
||||
import typing
|
||||
|
||||
try:
|
||||
from urllib.parse import urlparse, splithost, splitport, splittype, urldefrag
|
||||
from urllib.request import addinfourl
|
||||
from http.client import HTTPSConnection
|
||||
except ImportError:
|
||||
#python 2.x
|
||||
from urlparse import urlparse, urldefrag
|
||||
from urllib import addinfourl, splithost, splitport, splittype
|
||||
from httplib import HTTPSConnection
|
||||
import urllib3.connection
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from urllib3.util.ssl_ import create_urllib3_context
|
||||
|
||||
from . import oscerr
|
||||
|
||||
|
||||
# based on openssl's include/openssl/x509_vfy.h.in
|
||||
X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT = 18
|
||||
X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN = 19
|
||||
|
||||
|
||||
def create_ssl_context():
|
||||
"""
|
||||
Create a ssl context with disabled weak crypto.
|
||||
|
||||
Relatively safe defaults are set in urllib3 already,
|
||||
but we restrict crypto even more.
|
||||
"""
|
||||
ssl_context = create_urllib3_context()
|
||||
ssl_context.options |= ssl.OP_NO_SSLv2
|
||||
ssl_context.options |= ssl.OP_NO_SSLv3
|
||||
ssl_context.options |= ssl.OP_NO_TLSv1
|
||||
ssl_context.options |= ssl.OP_NO_TLSv1_1
|
||||
return ssl_context
|
||||
|
||||
|
||||
class CertVerificationError(oscerr.OscBaseError):
|
||||
def __str__(self):
|
||||
args_str = [str(i) for i in self.args]
|
||||
return "Certificate Verification Error: " + "\n".join(args_str)
|
||||
|
||||
from .core import raw_input
|
||||
|
||||
class TrustedCertStore:
|
||||
_tmptrusted = {}
|
||||
|
||||
def __init__(self, host, port, app, cert):
|
||||
|
||||
self.cert = cert
|
||||
def __init__(self, ssl_context, host, port):
|
||||
self.ssl_context = ssl_context
|
||||
self.host = host
|
||||
if self.host == None:
|
||||
raise Exception("empty host")
|
||||
if port:
|
||||
self.host += "_%d" % port
|
||||
import os
|
||||
self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
|
||||
self.file = self.dir + '/%s.pem' % self.host
|
||||
self.port = port or 443
|
||||
|
||||
def is_known(self):
|
||||
if self.host in self._tmptrusted:
|
||||
return True
|
||||
if not self.host:
|
||||
raise ValueError("Empty `host`")
|
||||
|
||||
import os
|
||||
if os.path.exists(self.file):
|
||||
return True
|
||||
return False
|
||||
file_name = f"{self.host}_{self.port}"
|
||||
self.dir_path = os.path.expanduser("~/.config/osc/trusted-certs")
|
||||
self.pem_path = os.path.join(self.dir_path, f"{file_name}.pem")
|
||||
|
||||
def is_trusted(self):
|
||||
import os
|
||||
if self.host in self._tmptrusted:
|
||||
cert = self._tmptrusted[self.host]
|
||||
if os.path.isfile(self.pem_path):
|
||||
# load permanently trusted certificate that is stored on disk
|
||||
with open(self.pem_path, "rb") as f:
|
||||
self.cert = x509.load_pem_x509_certificate(f.read())
|
||||
self.ssl_context.load_verify_locations(cafile=self.pem_path)
|
||||
else:
|
||||
if not os.path.exists(self.file):
|
||||
return False
|
||||
from M2Crypto import X509
|
||||
cert = X509.load_cert(self.file)
|
||||
if self.cert.as_pem() == cert.as_pem():
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
self.cert = None
|
||||
|
||||
def trust_tmp(self):
|
||||
self._tmptrusted[self.host] = self.cert
|
||||
def get_server_certificate(self):
|
||||
# The following code throws an exception on self-signed certs,
|
||||
# therefore we need to retrieve the cert differently.
|
||||
# pem = ssl.get_server_certificate((self.host, self.port))
|
||||
|
||||
def trust_always(self):
|
||||
self.trust_tmp()
|
||||
from M2Crypto import X509
|
||||
import os
|
||||
if not os.path.exists(self.dir):
|
||||
os.makedirs(self.dir)
|
||||
self.cert.save_pem(self.file)
|
||||
ssl_context = create_ssl_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
sock = ssl_context.wrap_socket(socket.socket(), server_hostname=self.host)
|
||||
sock.connect((self.host, self.port))
|
||||
der = sock.getpeercert(binary_form=True)
|
||||
pem = ssl.DER_cert_to_PEM_cert(der)
|
||||
cert = x509.load_pem_x509_certificate(pem.encode("utf-8"))
|
||||
return cert
|
||||
|
||||
|
||||
# verify_cb is called for each error once
|
||||
# we only collect the errors and return suceess
|
||||
# connection will be aborted later if it needs to
|
||||
def verify_cb(ctx, ok, store):
|
||||
if not ctx.verrs:
|
||||
ctx.verrs = ValidationErrors()
|
||||
|
||||
try:
|
||||
if not ok:
|
||||
ctx.verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
|
||||
return 1
|
||||
|
||||
except Exception as e:
|
||||
print(e, file=sys.stderr)
|
||||
return 0
|
||||
|
||||
class FailCert:
|
||||
def __init__(self, cert):
|
||||
def trust_permanently(self, cert):
|
||||
"""
|
||||
Permanently trust the certificate.
|
||||
Store it as a pem file in ~/.config/osc/trusted-certs.
|
||||
"""
|
||||
self.cert = cert
|
||||
self.errs = []
|
||||
data = self.cert.public_bytes(serialization.Encoding.PEM)
|
||||
with open(self.pem_path, "wb") as f:
|
||||
f.write(data)
|
||||
self.ssl_context.load_verify_locations(cafile=self.pem_path)
|
||||
|
||||
class ValidationErrors:
|
||||
def trust_temporarily(self, cert):
|
||||
"""
|
||||
Temporarily trust the certificate.
|
||||
"""
|
||||
self.cert = cert
|
||||
tmp_dir = os.path.expanduser("~/.config/osc")
|
||||
data = self.cert.public_bytes(serialization.Encoding.PEM)
|
||||
with tempfile.NamedTemporaryFile(mode="wb+", dir=tmp_dir, prefix="temp_trusted_cert_") as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
self.ssl_context.load_verify_locations(cafile=f.name)
|
||||
|
||||
def __init__(self):
|
||||
self.chain_ok = True
|
||||
self.cert_ok = True
|
||||
self.failures = {}
|
||||
|
||||
def record(self, cert, err, depth):
|
||||
#print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
|
||||
if depth == 0:
|
||||
self.cert_ok = False
|
||||
else:
|
||||
self.chain_ok = False
|
||||
|
||||
if not depth in self.failures:
|
||||
self.failures[depth] = FailCert(cert)
|
||||
else:
|
||||
if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
|
||||
raise Exception("Certificate changed unexpectedly. This should not happen")
|
||||
self.failures[depth].errs.append(err)
|
||||
|
||||
def show(self, out):
|
||||
for depth in self.failures.keys():
|
||||
cert = self.failures[depth].cert
|
||||
print("*** certificate verify failed at depth %d" % depth, file=out)
|
||||
print("Subject: ", cert.get_subject(), file=out)
|
||||
print("Issuer: ", cert.get_issuer(), file=out)
|
||||
print("Valid: ", cert.get_not_before(), "-", cert.get_not_after(), file=out)
|
||||
print("Fingerprint(MD5): ", cert.get_fingerprint('md5'), file=out)
|
||||
print("Fingerprint(SHA1): ", cert.get_fingerprint('sha1'), file=out)
|
||||
|
||||
for err in self.failures[depth].errs:
|
||||
reason = "Unknown"
|
||||
try:
|
||||
import M2Crypto.Err
|
||||
reason = M2Crypto.Err.get_x509_verify_error(err)
|
||||
except:
|
||||
pass
|
||||
print("Reason:", reason, file=out)
|
||||
|
||||
# check if the encountered errors could be ignored
|
||||
def could_ignore(self):
|
||||
if not 0 in self.failures:
|
||||
return True
|
||||
|
||||
nonfatal_errors = [
|
||||
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
|
||||
m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
|
||||
m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
|
||||
m2.X509_V_ERR_CERT_UNTRUSTED,
|
||||
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
|
||||
|
||||
m2.X509_V_ERR_CERT_NOT_YET_VALID,
|
||||
m2.X509_V_ERR_CERT_HAS_EXPIRED,
|
||||
m2.X509_V_OK,
|
||||
]
|
||||
|
||||
canignore = True
|
||||
for err in self.failures[0].errs:
|
||||
if not err in nonfatal_errors:
|
||||
canignore = False
|
||||
break
|
||||
|
||||
return canignore
|
||||
|
||||
class mySSLContext(SSL.Context):
|
||||
|
||||
def __init__(self):
|
||||
SSL.Context.__init__(self, 'sslv23')
|
||||
self.set_options(m2.SSL_OP_NO_SSLv2 | m2.SSL_OP_NO_SSLv3)
|
||||
self.set_cipher_list("ECDHE-RSA-AES128-SHA256:AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH")
|
||||
self.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
|
||||
self.verrs = None
|
||||
#self.set_info_callback() # debug
|
||||
self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self, ok, store))
|
||||
|
||||
class myHTTPSHandler(M2Crypto.m2urllib2.HTTPSHandler):
|
||||
handler_order = 499
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.appname = kwargs.pop('appname', 'generic')
|
||||
M2Crypto.m2urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
|
||||
|
||||
# copied from M2Crypto.m2urllib2.HTTPSHandler
|
||||
# it's sole purpose is to use our myHTTPSHandler/myHTTPSProxyHandler class
|
||||
# ideally the m2urllib2.HTTPSHandler.https_open() method would be split into
|
||||
# "do_open()" and "https_open()" so that we just need to override
|
||||
# the small "https_open()" method...)
|
||||
def https_open(self, req):
|
||||
# https://docs.python.org/3.3/library/urllib.request.html#urllib.request.Request.get_host
|
||||
try: # up to python-3.2
|
||||
host = req.get_host()
|
||||
except AttributeError: # from python-3.3
|
||||
host = req.host
|
||||
if not host:
|
||||
raise M2Crypto.m2urllib2.URLError('no host given')
|
||||
|
||||
# Our change: Check to see if we're using a proxy.
|
||||
# Then create an appropriate ssl-aware connection.
|
||||
full_url = req.get_full_url()
|
||||
target_host = urlparse(full_url)[1]
|
||||
|
||||
if target_host != host:
|
||||
request_uri = urldefrag(full_url)[0]
|
||||
h = myProxyHTTPSConnection(host=host, appname=self.appname, ssl_context=self.ctx)
|
||||
else:
|
||||
try: # up to python-3.2
|
||||
request_uri = req.get_selector()
|
||||
except AttributeError: # from python-3.3
|
||||
request_uri = req.selector
|
||||
h = myHTTPSConnection(host=host, appname=self.appname, ssl_context=self.ctx)
|
||||
# End our change
|
||||
h.set_debuglevel(self._debuglevel)
|
||||
|
||||
headers = dict(req.headers)
|
||||
headers.update(req.unredirected_hdrs)
|
||||
# We want to make an HTTP/1.1 request, but the addinfourl
|
||||
# class isn't prepared to deal with a persistent connection.
|
||||
# It will try to read all remaining data from the socket,
|
||||
# which will block while the server waits for the next request.
|
||||
# So make sure the connection gets closed after the (only)
|
||||
# request.
|
||||
headers["Connection"] = "close"
|
||||
@staticmethod
|
||||
def _display_cert(cert):
|
||||
print("Subject:", cert.subject.rfc4514_string())
|
||||
print("Issuer:", cert.issuer.rfc4514_string())
|
||||
try:
|
||||
h.request(req.get_method(), request_uri, req.data, headers)
|
||||
r = h.getresponse()
|
||||
except socket.error as err: # XXX what error?
|
||||
raise M2Crypto.m2urllib2.URLError(err)
|
||||
san_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
||||
san_ext_value = typing.cast(x509.SubjectAlternativeName, san_ext.value)
|
||||
san_ext_dnsnames = san_ext_value.get_values_for_type(x509.DNSName)
|
||||
except x509.extensions.ExtensionNotFound:
|
||||
san_ext_dnsnames = ["(not available)"]
|
||||
for san in san_ext_dnsnames:
|
||||
print("subjectAltName:", san)
|
||||
print("Valid:", cert.not_valid_before, "->", cert.not_valid_after)
|
||||
print("Fingerprint(MD5):", binascii.hexlify(cert.fingerprint(hashes.MD5())).decode("utf-8"))
|
||||
print("Fingerprint(SHA1):", binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode("utf-8"))
|
||||
|
||||
# Pick apart the HTTPResponse object to get the addinfourl
|
||||
# object initialized properly.
|
||||
|
||||
# Wrap the HTTPResponse object in socket's file object adapter
|
||||
# for Windows. That adapter calls recv(), so delegate recv()
|
||||
# to read(). This weird wrapping allows the returned object to
|
||||
# have readline() and readlines() methods.
|
||||
r.recv = r.read
|
||||
if (sys.version_info < (3, 0)):
|
||||
fp = socket._fileobject(r, close=True)
|
||||
def prompt_trust(self, cert, reason):
|
||||
if self.cert:
|
||||
# check if the certificate matches the already trusted certificate for the host and port
|
||||
if cert != self.cert:
|
||||
raise CertVerificationError([
|
||||
"Remote host identification has changed",
|
||||
"",
|
||||
"WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!",
|
||||
"IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!",
|
||||
"",
|
||||
f"Offending certificate is at '{self.pem_path}'"
|
||||
])
|
||||
else:
|
||||
r._decref_socketios = lambda: None
|
||||
r.ssl = h.sock.ssl
|
||||
r._timeout = -1.0
|
||||
# hack to bypass python3 bug with 0 buffer size and
|
||||
# http/client.py readinto method for response class
|
||||
if r.length is not None and r.length == 0:
|
||||
r.readinto = lambda b: 0
|
||||
r.recv_into = r.readinto
|
||||
fp = socket.SocketIO(r, 'rb')
|
||||
# since there is no trusted certificate on disk,
|
||||
# let's display the server cert and give user options to trust it
|
||||
print("The server certificate failed verification")
|
||||
print()
|
||||
self._display_cert(cert)
|
||||
print(f"Reason: {reason}")
|
||||
|
||||
resp = addinfourl(fp, r.msg, req.get_full_url())
|
||||
resp.code = r.status
|
||||
resp.msg = r.reason
|
||||
return resp
|
||||
|
||||
|
||||
class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.appname = kwargs.pop('appname', 'generic')
|
||||
M2Crypto.httpslib.HTTPSConnection.__init__(self, *args, **kwargs)
|
||||
|
||||
def _connect(self, family):
|
||||
# workaround for old M2Crypto versions where the the
|
||||
# SSL.Connection.__init__ constructor has no "family" parameter
|
||||
kwargs = {}
|
||||
argspec = inspect.getargspec(SSL.Connection.__init__)
|
||||
if 'family' in argspec.args:
|
||||
kwargs['family'] = family
|
||||
elif family != socket.AF_INET:
|
||||
# old SSL.Connection classes implicitly use socket.AF_INET
|
||||
return False
|
||||
|
||||
self.sock = SSL.Connection(self.ssl_ctx, **kwargs)
|
||||
if self.session:
|
||||
self.sock.set_session(self.session)
|
||||
if hasattr(self.sock, 'set_tlsext_host_name'):
|
||||
self.sock.set_tlsext_host_name(self.host)
|
||||
self.sock.connect((self.host, self.port))
|
||||
return True
|
||||
|
||||
def connect(self):
|
||||
# based on M2Crypto.httpslib.HTTPSConnection.connect
|
||||
last_exc = None
|
||||
connected = False
|
||||
for addrinfo in socket.getaddrinfo(self.host, self.port,
|
||||
socket.AF_UNSPEC,
|
||||
socket.SOCK_STREAM,
|
||||
0, 0):
|
||||
try:
|
||||
connected = self._connect(addrinfo[0])
|
||||
if connected:
|
||||
break
|
||||
except socket.error as e:
|
||||
last_exc = e
|
||||
finally:
|
||||
if not connected and self.sock is not None:
|
||||
self.sock.close()
|
||||
if not connected:
|
||||
if last_exc is None:
|
||||
msg = 'getaddrinfo returned empty list or unsupported families'
|
||||
raise RuntimeError(msg)
|
||||
raise last_exc
|
||||
# ok we are connected, verify cert
|
||||
verify_certificate(self)
|
||||
|
||||
def getHost(self):
|
||||
return self.host
|
||||
|
||||
def getPort(self):
|
||||
return self.port
|
||||
|
||||
class myProxyHTTPSConnection(M2Crypto.httpslib.ProxyHTTPSConnection, HTTPSConnection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.appname = kwargs.pop('appname', 'generic')
|
||||
M2Crypto.httpslib.ProxyHTTPSConnection.__init__(self, *args, **kwargs)
|
||||
|
||||
def _start_ssl(self):
|
||||
M2Crypto.httpslib.ProxyHTTPSConnection._start_ssl(self)
|
||||
verify_certificate(self)
|
||||
|
||||
def endheaders(self, *args, **kwargs):
|
||||
if self._proxy_auth is None:
|
||||
self._proxy_auth = self._encode_auth()
|
||||
HTTPSConnection.endheaders(self, *args, **kwargs)
|
||||
|
||||
# broken in m2crypto: port needs to be an int
|
||||
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
|
||||
#putrequest is called before connect, so can interpret url and get
|
||||
#real host/port to be used to make CONNECT request to proxy
|
||||
proto, rest = splittype(url)
|
||||
if proto is None:
|
||||
raise ValueError("unknown URL type: %s" % url)
|
||||
#get host
|
||||
host, rest = splithost(rest)
|
||||
#try to get port
|
||||
host, port = splitport(host)
|
||||
#if port is not defined try to get from proto
|
||||
if port is None:
|
||||
try:
|
||||
port = self._ports[proto]
|
||||
except KeyError:
|
||||
raise ValueError("unknown protocol for: %s" % url)
|
||||
self._real_host = host
|
||||
self._real_port = int(port)
|
||||
M2Crypto.httpslib.HTTPSConnection.putrequest(self, method, url, skip_host, skip_accept_encoding)
|
||||
|
||||
def getHost(self):
|
||||
return self._real_host
|
||||
|
||||
def getPort(self):
|
||||
return self._real_port
|
||||
|
||||
def verify_certificate(connection):
|
||||
ctx = connection.sock.ctx
|
||||
verrs = ctx.verrs
|
||||
ctx.verrs = None
|
||||
cert = connection.sock.get_peer_cert()
|
||||
if not cert:
|
||||
connection.close()
|
||||
raise SSLVerificationError("server did not present a certificate")
|
||||
|
||||
# XXX: should be check if the certificate is known anyways?
|
||||
# Maybe it changed to something valid.
|
||||
if not connection.sock.verify_ok():
|
||||
|
||||
tc = TrustedCertStore(connection.getHost(), connection.getPort(), connection.appname, cert)
|
||||
|
||||
if tc.is_known():
|
||||
|
||||
if tc.is_trusted(): # ok, same cert as the stored one
|
||||
return
|
||||
else:
|
||||
print("WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", file=sys.stderr)
|
||||
print("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", file=sys.stderr)
|
||||
print("offending certificate is at '%s'" % tc.file, file=sys.stderr)
|
||||
raise SSLVerificationError("remote host identification has changed")
|
||||
|
||||
# if http_debug is set we redirect sys.stdout to an StringIO
|
||||
# instance in order to do some header filtering (see conf module)
|
||||
# so we have to use the "original" stdout for printing
|
||||
out = getattr(connection, '_orig_stdout', sys.stdout)
|
||||
verrs.show(out)
|
||||
|
||||
print(file=out)
|
||||
|
||||
if not verrs.could_ignore():
|
||||
raise SSLVerificationError("Certificate validation error cannot be ignored")
|
||||
|
||||
if not verrs.chain_ok:
|
||||
print("A certificate in the chain failed verification", file=out)
|
||||
if not verrs.cert_ok:
|
||||
print("The server certificate failed verification", file=out)
|
||||
|
||||
while True:
|
||||
print("""
|
||||
while True:
|
||||
print("""
|
||||
Would you like to
|
||||
0 - quit (default)
|
||||
1 - continue anyways
|
||||
2 - trust the server certificate permanently
|
||||
9 - review the server certificate
|
||||
""", file=out)
|
||||
""")
|
||||
|
||||
print("Enter choice [0129]: ", end='', file=out)
|
||||
r = raw_input()
|
||||
if not r or r == '0':
|
||||
connection.close()
|
||||
raise SSLVerificationError("Untrusted Certificate")
|
||||
elif r == '1':
|
||||
tc.trust_tmp()
|
||||
return
|
||||
elif r == '2':
|
||||
tc.trust_always()
|
||||
return
|
||||
elif r == '9':
|
||||
print(cert.as_text(), file=out)
|
||||
|
||||
# vim: sw=4 et
|
||||
print("Enter choice [0129]: ", end="")
|
||||
r = input()
|
||||
if not r or r == "0":
|
||||
raise CertVerificationError(["Untrusted certificate"])
|
||||
elif r == "1":
|
||||
self.trust_temporarily(cert)
|
||||
return
|
||||
elif r == "2":
|
||||
self.trust_permanently(cert)
|
||||
return
|
||||
elif r == "9":
|
||||
print(cert.to_txt())
|
||||
|
@ -1,8 +0,0 @@
|
||||
class NoSecureSSLError(Exception):
|
||||
def __init__(self, msg):
|
||||
Exception.__init__(self)
|
||||
self.msg = msg
|
||||
def __str__(self):
|
||||
return self.msg
|
||||
|
||||
# vim: sw=4 et
|
2
setup.py
2
setup.py
@ -103,7 +103,7 @@ setuptools.setup(
|
||||
packages=['osc', 'osc.util'],
|
||||
scripts=['osc-wrapper.py'],
|
||||
data_files=data_files,
|
||||
install_requires=['M2Crypto', 'chardet'],
|
||||
install_requires=['cryptography', 'urllib3'],
|
||||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Environment :: Console",
|
||||
|
208
tests/common.py
208
tests/common.py
@ -1,29 +1,18 @@
|
||||
import unittest
|
||||
import osc.core
|
||||
import shutil
|
||||
import tempfile
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
try:
|
||||
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
|
||||
from xml.etree import cElementTree as ET
|
||||
except ImportError:
|
||||
# will import a fast implementation from 3.3 onwards, needed
|
||||
# for 3.9+
|
||||
from xml.etree import ElementTree as ET
|
||||
EXPECTED_REQUESTS = []
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
from urllib.request import HTTPHandler, addinfourl, build_opener
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
try:
|
||||
#python 2.x
|
||||
from cStringIO import StringIO
|
||||
from urllib2 import HTTPHandler, addinfourl, build_opener
|
||||
from urlparse import urlparse, parse_qs
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
from urllib.request import HTTPHandler, addinfourl, build_opener
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import urllib3.response
|
||||
|
||||
import osc.core
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
def urlcompare(url, *args):
|
||||
"""compare all components of url except query string - it is converted to
|
||||
@ -94,94 +83,122 @@ class RequestDataMismatch(Exception):
|
||||
def __str__(self):
|
||||
return '%s, %s, %s' % (self.url, self.got, self.exp)
|
||||
|
||||
class MyHTTPHandler(HTTPHandler):
|
||||
def __init__(self, exp_requests, fixtures_dir):
|
||||
HTTPHandler.__init__(self)
|
||||
self.__exp_requests = exp_requests
|
||||
self.__fixtures_dir = fixtures_dir
|
||||
|
||||
def http_open(self, req):
|
||||
r = self.__exp_requests.pop(0)
|
||||
if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]:
|
||||
raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0])
|
||||
if req.get_method() in ('GET', 'DELETE'):
|
||||
return self.__mock_GET(r[1], **r[2])
|
||||
elif req.get_method() in ('PUT', 'POST'):
|
||||
return self.__mock_PUT(req, **r[2])
|
||||
EXPECTED_REQUESTS = []
|
||||
|
||||
def __mock_GET(self, fullurl, **kwargs):
|
||||
return self.__get_response(fullurl, **kwargs)
|
||||
|
||||
def __mock_PUT(self, req, **kwargs):
|
||||
exp = kwargs.get('exp', None)
|
||||
if exp is not None and 'expfile' in kwargs:
|
||||
raise RuntimeError('either specify exp or expfile')
|
||||
elif 'expfile' in kwargs:
|
||||
exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'rb').read()
|
||||
elif exp is None:
|
||||
raise RuntimeError('exp or expfile required')
|
||||
# HACK: Fix "ValueError: I/O operation on closed file." error in tests on openSUSE Leap 15.2.
|
||||
# The problem seems to appear only in the tests, possibly some interaction with MockHTTPConnectionPool.
|
||||
# Porting 753fbc03 to urllib3 in openSUSE Leap 15.2 would fix the problem.
|
||||
urllib3.response.HTTPResponse.__iter__ = lambda self : iter(self._fp)
|
||||
|
||||
|
||||
class MockHTTPConnectionPool:
|
||||
def __init__(self, host, port=None, **conn_kw):
|
||||
pass
|
||||
|
||||
def urlopen(self, method, url, body=None, headers=None, retries=None, **response_kw):
|
||||
global EXPECTED_REQUESTS
|
||||
request = EXPECTED_REQUESTS.pop(0)
|
||||
|
||||
url = f"http://localhost{url}"
|
||||
|
||||
if not urlcompare(request["url"], url) or request["method"] != method:
|
||||
raise RequestWrongOrder(request["url"], url, request["method"], method)
|
||||
|
||||
if method in ("POST", "PUT"):
|
||||
if 'exp' not in request and 'expfile' in request:
|
||||
with open(request['expfile'], 'rb') as f:
|
||||
exp = f.read()
|
||||
elif 'exp' in request and 'expfile' not in request:
|
||||
exp = request['exp'].encode('utf-8')
|
||||
else:
|
||||
raise RuntimeError('Specify either `exp` or `expfile`')
|
||||
|
||||
body = body or b""
|
||||
if hasattr(body, "read"):
|
||||
# if it is a file-like object, read it
|
||||
body = body.read()
|
||||
if hasattr(body, "encode"):
|
||||
# if it can be encoded to bytes, do it
|
||||
body = body.encode("utf-8")
|
||||
|
||||
if body != exp:
|
||||
# We do not have a notion to explicitly mark xml content. In case
|
||||
# of xml, we do not care about the exact xml representation (for
|
||||
# now). Hence, if both, data and exp, are xml and are "equal",
|
||||
# everything is fine (for now); otherwise, error out
|
||||
# (of course, this is problematic if we want to ensure that XML
|
||||
# documents are bit identical...)
|
||||
if not xml_equal(body, exp):
|
||||
raise RequestDataMismatch(url, repr(body), repr(exp))
|
||||
|
||||
if 'exception' in request:
|
||||
raise request["exception"]
|
||||
|
||||
if 'text' not in request and 'file' in request:
|
||||
with open(request['file'], 'rb') as f:
|
||||
data = f.read()
|
||||
elif 'text' in request and 'file' not in request:
|
||||
data = request['text'].encode('utf-8')
|
||||
else:
|
||||
# for now, assume exp is a str
|
||||
exp = exp.encode('utf-8')
|
||||
# use req.data instead of req.get_data() for python3 compatiblity
|
||||
data = req.data
|
||||
if hasattr(data, 'read'):
|
||||
data = data.read()
|
||||
if data != exp:
|
||||
# We do not have a notion to explicitly mark xml content. In case
|
||||
# of xml, we do not care about the exact xml representation (for
|
||||
# now). Hence, if both, data and exp, are xml and are "equal",
|
||||
# everything is fine (for now); otherwise, error out
|
||||
# (of course, this is problematic if we want to ensure that XML
|
||||
# documents are bit identical...)
|
||||
if not xml_equal(data, exp):
|
||||
raise RequestDataMismatch(req.get_full_url(), repr(data), repr(exp))
|
||||
return self.__get_response(req.get_full_url(), **kwargs)
|
||||
raise RuntimeError('Specify either `text` or `file`')
|
||||
|
||||
def __get_response(self, url, **kwargs):
|
||||
f = None
|
||||
if 'exception' in kwargs:
|
||||
raise kwargs['exception']
|
||||
if 'text' not in kwargs and 'file' in kwargs:
|
||||
f = BytesIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'rb').read())
|
||||
elif 'text' in kwargs and 'file' not in kwargs:
|
||||
f = BytesIO(kwargs['text'].encode('utf-8'))
|
||||
else:
|
||||
raise RuntimeError('either specify text or file')
|
||||
resp = addinfourl(f, {}, url)
|
||||
resp.code = kwargs.get('code', 200)
|
||||
resp.msg = ''
|
||||
return resp
|
||||
response = urllib3.response.HTTPResponse(body=data, status=request.get("code", 200))
|
||||
response._fp = io.BytesIO(data)
|
||||
|
||||
def urldecorator(method, fullurl, **kwargs):
|
||||
return response
|
||||
|
||||
|
||||
def urldecorator(method, url, **kwargs):
|
||||
def decorate(test_method):
|
||||
def wrapped_test_method(*args):
|
||||
addExpectedRequest(method, fullurl, **kwargs)
|
||||
test_method(*args)
|
||||
# "rename" method otherwise we cannot specify a TestCaseClass.testName
|
||||
# cmdline arg when using unittest.main()
|
||||
def wrapped_test_method(self):
|
||||
# put all args into a single dictionary
|
||||
kwargs["method"] = method
|
||||
kwargs["url"] = url
|
||||
|
||||
# prepend fixtures dir to `file`
|
||||
if "file" in kwargs:
|
||||
kwargs["file"] = os.path.join(self._get_fixtures_dir(), kwargs["file"])
|
||||
|
||||
# prepend fixtures dir to `expfile`
|
||||
if "expfile" in kwargs:
|
||||
kwargs["expfile"] = os.path.join(self._get_fixtures_dir(), kwargs["expfile"])
|
||||
|
||||
EXPECTED_REQUESTS.append(kwargs)
|
||||
|
||||
test_method(self)
|
||||
|
||||
# mock connection pool, but only just once
|
||||
if not hasattr(test_method, "_MockHTTPConnectionPool"):
|
||||
wrapped_test_method = patch('urllib3.HTTPConnectionPool', MockHTTPConnectionPool)(wrapped_test_method)
|
||||
wrapped_test_method._MockHTTPConnectionPool = True
|
||||
|
||||
wrapped_test_method.__name__ = test_method.__name__
|
||||
return wrapped_test_method
|
||||
return decorate
|
||||
|
||||
def GET(fullurl, **kwargs):
|
||||
return urldecorator('GET', fullurl, **kwargs)
|
||||
|
||||
def PUT(fullurl, **kwargs):
|
||||
return urldecorator('PUT', fullurl, **kwargs)
|
||||
def GET(path, **kwargs):
|
||||
return urldecorator('GET', path, **kwargs)
|
||||
|
||||
def POST(fullurl, **kwargs):
|
||||
return urldecorator('POST', fullurl, **kwargs)
|
||||
|
||||
def DELETE(fullurl, **kwargs):
|
||||
return urldecorator('DELETE', fullurl, **kwargs)
|
||||
def PUT(path, **kwargs):
|
||||
return urldecorator('PUT', path, **kwargs)
|
||||
|
||||
|
||||
def POST(path, **kwargs):
|
||||
return urldecorator('POST', path, **kwargs)
|
||||
|
||||
|
||||
def DELETE(path, **kwargs):
|
||||
return urldecorator('DELETE', path, **kwargs)
|
||||
|
||||
def addExpectedRequest(method, url, **kwargs):
|
||||
global EXPECTED_REQUESTS
|
||||
EXPECTED_REQUESTS.append((method, url, kwargs))
|
||||
|
||||
class OscTestCase(unittest.TestCase):
|
||||
def setUp(self, copytree=True):
|
||||
global EXPECTED_REQUESTS
|
||||
EXPECTED_REQUESTS = []
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc')
|
||||
osc.core.conf.get_config(override_conffile=oscrc,
|
||||
@ -191,19 +208,16 @@ class OscTestCase(unittest.TestCase):
|
||||
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
|
||||
if copytree:
|
||||
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest'))
|
||||
global EXPECTED_REQUESTS
|
||||
EXPECTED_REQUESTS = []
|
||||
osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir()))
|
||||
self.stdout = sys.stdout
|
||||
sys.stdout = StringIO()
|
||||
sys.stdout = io.StringIO()
|
||||
|
||||
def tearDown(self):
|
||||
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
|
||||
sys.stdout = self.stdout
|
||||
try:
|
||||
shutil.rmtree(self.tmpdir)
|
||||
except:
|
||||
pass
|
||||
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
|
||||
|
||||
def _get_fixtures_dir(self):
|
||||
raise NotImplementedError('subclasses should implement this method')
|
||||
|
@ -4,48 +4,41 @@ import osc.oscerr
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from .common import GET, POST, OscTestCase, addExpectedRequest, EXPECTED_REQUESTS
|
||||
from .common import GET, POST, OscTestCase, EXPECTED_REQUESTS
|
||||
|
||||
|
||||
FIXTURES_DIR = os.path.join(os.path.dirname(__file__), 'prdiff_fixtures')
|
||||
API_URL = 'http://localhost/'
|
||||
UPSTREAM = 'some:project'
|
||||
BRANCH = 'home:user:branches:' + UPSTREAM
|
||||
|
||||
def rdiff_url(pkg, oldprj, newprj):
|
||||
return API_URL + 'source/%s/%s?unified=1&opackage=%s&oproject=%s&cmd=diff&expand=1&tarlimit=0&filelimit=0' % \
|
||||
return 'http://localhost/source/%s/%s?unified=1&opackage=%s&oproject=%s&cmd=diff&expand=1&tarlimit=0&filelimit=0' % \
|
||||
(newprj, pkg, pkg, oldprj.replace(':', '%3A'))
|
||||
|
||||
def request_url(prj):
|
||||
return API_URL + 'search/request?match=%%28state%%2F%%40name%%3D%%27new%%27+or+state%%2F%%40name%%3D%%27review%%27%%29+and+%%28action%%2Ftarget%%2F%%40project%%3D%%27%s%%27+or+action%%2Fsource%%2F%%40project%%3D%%27%s%%27%%29' % \
|
||||
return 'http://localhost/search/request?match=%%28state%%2F%%40name%%3D%%27new%%27+or+state%%2F%%40name%%3D%%27review%%27%%29+and+%%28action%%2Ftarget%%2F%%40project%%3D%%27%s%%27+or+action%%2Fsource%%2F%%40project%%3D%%27%s%%27%%29' % \
|
||||
tuple([prj.replace(':', '%3A')] * 2)
|
||||
|
||||
|
||||
def GET_PROJECT_PACKAGES(*projects):
|
||||
def decorator(test_method):
|
||||
def wrapped_test_method(*args):
|
||||
for project in projects:
|
||||
addExpectedRequest('GET', API_URL + 'source/' + project,
|
||||
file='%s/directory' % project)
|
||||
test_method(*args)
|
||||
# "rename" method otherwise we cannot specify a TestCaseClass.testName
|
||||
# cmdline arg when using unittest.main()
|
||||
wrapped_test_method.__name__ = test_method.__name__
|
||||
return wrapped_test_method
|
||||
# decorators get applied in the reversed order (bottom-up)
|
||||
for project in reversed(projects):
|
||||
test_method = GET(f'http://localhost/source/{project}', file=f'{project}/directory')(test_method)
|
||||
return test_method
|
||||
return decorator
|
||||
|
||||
|
||||
def POST_RDIFF(oldprj, newprj):
|
||||
def decorator(test_method):
|
||||
def wrapped_test_method(*args):
|
||||
addExpectedRequest('POST', rdiff_url('common-one', oldprj, newprj), exp='', text='')
|
||||
addExpectedRequest('POST', rdiff_url('common-two', oldprj, newprj), exp='', file='common-two-diff')
|
||||
addExpectedRequest('POST', rdiff_url('common-three', oldprj, newprj), exp='', text='')
|
||||
test_method(*args)
|
||||
# "rename" method otherwise we cannot specify a TestCaseClass.testName
|
||||
# cmdline arg when using unittest.main()
|
||||
wrapped_test_method.__name__ = test_method.__name__
|
||||
return wrapped_test_method
|
||||
# decorators get applied in the reversed order (bottom-up)
|
||||
test_method = POST(rdiff_url('common-three', oldprj, newprj), exp='', text='')(test_method)
|
||||
test_method = POST(rdiff_url('common-two', oldprj, newprj), exp='', file='common-two-diff')(test_method)
|
||||
test_method = POST(rdiff_url('common-one', oldprj, newprj), exp='', text='')(test_method)
|
||||
return test_method
|
||||
return decorator
|
||||
|
||||
|
||||
def suite():
|
||||
import unittest
|
||||
return unittest.makeSuite(TestProjectDiff)
|
||||
|
@ -206,17 +206,6 @@ class TestRepairWC(OscTestCase):
|
||||
self.assertEqual(open(os.path.join('.osc', '_apiurl')).read(), 'http://localhost\n')
|
||||
self.assertEqual(p.apiurl, 'http://localhost')
|
||||
|
||||
def test_invalidapiurl_param(self):
|
||||
"""pass an invalid apiurl to wc_repair"""
|
||||
try:
|
||||
from urllib.error import URLError
|
||||
except ImportError:
|
||||
from urllib2 import URLError
|
||||
self._change_to_pkg('invalid_apiurl')
|
||||
p = osc.core.Package('.', wc_check=False)
|
||||
self.assertRaises(URLError, p.wc_repair, 'http:/localhost')
|
||||
self.assertRaises(URLError, p.wc_repair, 'invalid')
|
||||
|
||||
def test_noapiurlNotExistingApiurl(self):
|
||||
"""the package wc has no _apiurl file and no apiurl is passed to repairwc"""
|
||||
self._change_to_pkg('noapiurl')
|
||||
@ -238,33 +227,6 @@ class TestRepairWC(OscTestCase):
|
||||
self.assertTrue(os.path.exists(os.path.join(storedir, '_apiurl')))
|
||||
self.assertEqual(open(os.path.join(storedir, '_apiurl'), 'r').read(), 'http://localhost\n')
|
||||
|
||||
def test_project_invalidapiurl(self):
|
||||
"""the project wc has an invalid _apiurl file (invalid url format)"""
|
||||
import shutil
|
||||
prj_dir = os.path.join(self.tmpdir, 'prj_invalidapiurl')
|
||||
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'prj_invalidapiurl'), prj_dir)
|
||||
storedir = os.path.join(prj_dir, osc.core.store)
|
||||
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Project, prj_dir, getPackageList=False)
|
||||
prj = osc.core.Project(prj_dir, wc_check=False, getPackageList=False)
|
||||
prj.wc_repair('http://localhost')
|
||||
self.assertTrue(os.path.exists(os.path.join(storedir, '_apiurl')))
|
||||
self.assertTrue(os.path.exists(os.path.join(storedir, '_apiurl')))
|
||||
self.assertEqual(open(os.path.join(storedir, '_apiurl'), 'r').read(), 'http://localhost\n')
|
||||
|
||||
def test_project_invalidapiurl_param(self):
|
||||
"""pass an invalid apiurl to wc_repair"""
|
||||
import shutil
|
||||
try:
|
||||
from urllib.error import URLError
|
||||
except ImportError:
|
||||
from urllib2 import URLError
|
||||
prj_dir = os.path.join(self.tmpdir, 'prj_invalidapiurl')
|
||||
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'prj_invalidapiurl'), prj_dir)
|
||||
storedir = os.path.join(prj_dir, osc.core.store)
|
||||
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Project, prj_dir, getPackageList=False)
|
||||
prj = osc.core.Project(prj_dir, wc_check=False, getPackageList=False)
|
||||
self.assertRaises(URLError, prj.wc_repair, 'http:/localhost')
|
||||
self.assertRaises(URLError, prj.wc_repair, 'invalid')
|
||||
|
||||
if __name__ == '__main__':
|
||||
import unittest
|
||||
|
Loading…
Reference in New Issue
Block a user