Index: setuptools-67.7.2/setuptools/package_index.py =================================================================== --- setuptools-67.7.2.orig/setuptools/package_index.py +++ setuptools-67.7.2/setuptools/package_index.py @@ -1,11 +1,13 @@ """PyPI and direct package downloading.""" import sys +import subprocess import os import re import io import shutil import socket +import warnings import base64 import hashlib import itertools @@ -587,7 +589,7 @@ class PackageIndex(Environment): scheme = URL_SCHEME(spec) if scheme: # It's a url, download it to tmpdir - found = self._download_url(scheme.group(1), spec, tmpdir) + found = self._download_url(spec, tmpdir) base, fragment = egg_info_for_url(spec) if base.endswith('.py'): found = self.gen_setup(found, fragment, tmpdir) @@ -814,7 +816,7 @@ class PackageIndex(Environment): else: raise DistutilsError("Download error for %s: %s" % (url, v)) from v - def _download_url(self, scheme, url, tmpdir): + def _download_url(self, url, tmpdir): # Determine download filename # name, fragment = egg_info_for_url(url) @@ -829,19 +831,75 @@ class PackageIndex(Environment): filename = os.path.join(tmpdir, name) - # Download the file - # - if scheme == 'svn' or scheme.startswith('svn+'): - return self._download_svn(url, filename) - elif scheme == 'git' or scheme.startswith('git+'): - return self._download_git(url, filename) - elif scheme.startswith('hg+'): - return self._download_hg(url, filename) - elif scheme == 'file': - return urllib.request.url2pathname(urllib.parse.urlparse(url)[2]) + return self._download_vcs(url, filename) or self._download_other(url, filename) + + @staticmethod + def _resolve_vcs(url): + """ + >>> rvcs = PackageIndex._resolve_vcs + >>> rvcs('git+http://foo/bar') + 'git' + >>> rvcs('hg+https://foo/bar') + 'hg' + >>> rvcs('git:myhost') + 'git' + >>> rvcs('hg:myhost') + >>> rvcs('http://foo/bar') + """ + scheme = urllib.parse.urlsplit(url).scheme + pre, sep, post = scheme.partition('+') + # svn and git have their own protocol; hg does not + allowed = set(['svn', 'git'] + ['hg'] * bool(sep)) + return next(iter({pre} & allowed), None) + + def _download_vcs(self, url, spec_filename): + vcs = self._resolve_vcs(url) + if not vcs: + return + if vcs == 'svn': + warnings.warn("SVN download support is deprecated", UserWarning) + + filename, _, _ = spec_filename.partition('#') + url, rev = self._vcs_split_rev_from_url(url) + svn_creds = [] + if url.lower().startswith('svn:') and '@' in url: + parsed_url = urllib.parse.urlparse(url) + if parsed_url.username and parsed_url.password: + svn_creds.extend( + ["--username", parsed_url.username, + "--password", parsed_url.password]) + elif parsed_url.username and not parsed_url.password: + svn_creds.extend(["--username", parsed_url.username]) + # We need to remove the auth from the URL + domain = parsed_url.netloc.split('@')[1] + parsed_url = parsed_url._replace(netloc=domain) + url = urllib.parse.urlunparse(parsed_url) + + self.info("Doing %s clone from %s to %s" % (vcs, url, filename)) + if vcs == 'svn': + cmd_line = [vcs, 'checkout', '-q'] + svn_creds + [url, filename] + subprocess.check_call(cmd_line) + return filename else: - self.url_ok(url, True) # raises error if not allowed - return self._attempt_download(url, filename) + subprocess.check_call([vcs, 'clone', '--quiet', url, filename]) + + co_commands = dict( + git=[vcs, '-C', filename, 'checkout', '--quiet', rev], + hg=[vcs, '--cwd', filename, 'up', '-C', '-r', rev, '-q'], + ) + if rev is not None: + self.info("Checking out %s" % rev) + subprocess.check_call(co_commands[vcs]) + + return filename + + def _download_other(self, url, filename): + scheme = urllib.parse.urlsplit(url).scheme + if scheme == 'file': # pragma: no cover + return urllib.request.url2pathname(urllib.parse.urlparse(url).path) + # raise error if not allowed + self.url_ok(url, True) + return self._attempt_download(url, filename) def scan_url(self, url): self.process_url(url, True) @@ -868,87 +926,35 @@ class PackageIndex(Environment): os.unlink(filename) raise DistutilsError("Unexpected HTML page found at " + url) - def _download_svn(self, url, filename): - SetuptoolsDeprecationWarning.emit( - "Invalid config", - f"SVN download support is deprecated: {url}", - due_date=(2023, 6, 1), # Initially introduced in 23 Sept 2018 - ) - url = url.split('#', 1)[0] # remove any fragment for svn's sake - creds = '' - if url.lower().startswith('svn:') and '@' in url: - scheme, netloc, path, p, q, f = urllib.parse.urlparse(url) - if not netloc and path.startswith('//') and '/' in path[2:]: - netloc, path = path[2:].split('/', 1) - auth, host = _splituser(netloc) - if auth: - if ':' in auth: - user, pw = auth.split(':', 1) - creds = " --username=%s --password=%s" % (user, pw) - else: - creds = " --username=" + auth - netloc = host - parts = scheme, netloc, url, p, q, f - url = urllib.parse.urlunparse(parts) - self.info("Doing subversion checkout from %s to %s", url, filename) - os.system("svn checkout%s -q %s %s" % (creds, url, filename)) - return filename - @staticmethod - def _vcs_split_rev_from_url(url, pop_prefix=False): - scheme, netloc, path, query, frag = urllib.parse.urlsplit(url) - - scheme = scheme.split('+', 1)[-1] - - # Some fragment identification fails - path = path.split('#', 1)[0] + def _vcs_split_rev_from_url(url,): + """ + Given a possible VCS URL, return a clean URL and resolved revision if any. + >>> vsrfu = PackageIndex._vcs_split_rev_from_url + >>> vsrfu('git+https://github.com/pypa/setuptools@v69.0.0#egg-info=setuptools') + ('https://github.com/pypa/setuptools', 'v69.0.0') + >>> vsrfu('git+https://github.com/pypa/setuptools#egg-info=setuptools') + ('https://github.com/pypa/setuptools', None) + >>> vsrfu('http://foo/bar') + ('http://foo/bar', None) + """ + parts = urllib.parse.urlsplit(url) - rev = None - if '@' in path: - path, rev = path.rsplit('@', 1) + clean_scheme = parts.scheme.split('+', 1)[-1] # Also, discard fragment - url = urllib.parse.urlunsplit((scheme, netloc, path, query, '')) - - return url, rev + no_fragment_path, _, _ = parts.path.partition('#') - def _download_git(self, url, filename): - filename = filename.split('#', 1)[0] - url, rev = self._vcs_split_rev_from_url(url, pop_prefix=True) - - self.info("Doing git clone from %s to %s", url, filename) - os.system("git clone --quiet %s %s" % (url, filename)) - - if rev is not None: - self.info("Checking out %s", rev) - os.system( - "git -C %s checkout --quiet %s" - % ( - filename, - rev, - ) - ) + pre, sep, post = no_fragment_path.rpartition('@') + clean_path, rev = (pre, post) if sep else (post, None) - return filename - - def _download_hg(self, url, filename): - filename = filename.split('#', 1)[0] - url, rev = self._vcs_split_rev_from_url(url, pop_prefix=True) - - self.info("Doing hg clone from %s to %s", url, filename) - os.system("hg clone --quiet %s %s" % (url, filename)) - - if rev is not None: - self.info("Updating to %s", rev) - os.system( - "hg --cwd %s up -C -r %s -q" - % ( - filename, - rev, - ) - ) - - return filename + resolved = parts._replace( + scheme=clean_scheme, + path=clean_path, + # discard the fragment + fragment='', + ).geturl() + return resolved, rev def debug(self, msg, *args): log.debug(msg, *args) Index: setuptools-67.7.2/setuptools/tests/test_packageindex.py =================================================================== --- setuptools-67.7.2.orig/setuptools/tests/test_packageindex.py +++ setuptools-67.7.2/setuptools/tests/test_packageindex.py @@ -190,53 +190,50 @@ class TestPackageIndex: url = 'git+https://github.example/group/project@master#egg=foo' index = setuptools.package_index.PackageIndex() - with mock.patch("os.system") as os_system_mock: + with mock.patch("subprocess.check_call") as subprocess_mock: result = index.download(url, str(tmpdir)) - os_system_mock.assert_called() - expected_dir = str(tmpdir / 'project@master') - expected = ( - 'git clone --quiet ' 'https://github.example/group/project {expected_dir}' - ).format(**locals()) - first_call_args = os_system_mock.call_args_list[0][0] - assert first_call_args == (expected,) - - tmpl = 'git -C {expected_dir} checkout --quiet master' - expected = tmpl.format(**locals()) - assert os_system_mock.call_args_list[1][0] == (expected,) + expected_clone = mock.call([ + 'git', 'clone', '--quiet', 'https://github.example/group/project', + expected_dir, + ]) + expected_checkout = mock.call([ + 'git', '-C', expected_dir, 'checkout', '--quiet', 'master', + ]) + subprocess_mock.assert_has_calls((expected_clone, expected_checkout)) + assert subprocess_mock.call_count == 2 assert result == expected_dir def test_download_git_no_rev(self, tmpdir): url = 'git+https://github.example/group/project#egg=foo' index = setuptools.package_index.PackageIndex() - with mock.patch("os.system") as os_system_mock: - result = index.download(url, str(tmpdir)) - - os_system_mock.assert_called() + with mock.patch("subprocess.check_call") as subprocess_mock: + result = index.download(url, str(tmpdir)) - expected_dir = str(tmpdir / 'project') - expected = ( - 'git clone --quiet ' 'https://github.example/group/project {expected_dir}' - ).format(**locals()) - os_system_mock.assert_called_once_with(expected) + expected_dir = os.path.join(str(tmpdir), 'project') + expected_clone = [ + 'git', 'clone', '--quiet', 'https://github.example/group/project', + expected_dir, + ] + subprocess_mock.assert_called_once_with(expected_clone) + assert subprocess_mock.call_count == 1 + assert result == expected_dir def test_download_svn(self, tmpdir): url = 'svn+https://svn.example/project#egg=foo' index = setuptools.package_index.PackageIndex() with pytest.warns(UserWarning): - with mock.patch("os.system") as os_system_mock: + with mock.patch("subprocess.check_call") as subprocess_mock: result = index.download(url, str(tmpdir)) - os_system_mock.assert_called() - expected_dir = str(tmpdir / 'project') - expected = ( - 'svn checkout -q ' 'svn+https://svn.example/project {expected_dir}' - ).format(**locals()) - os_system_mock.assert_called_once_with(expected) + expected = [ + 'svn', 'checkout', '-q', 'https://svn.example/project', expected_dir + ] + subprocess_mock.assert_called_once_with(expected) class TestContentCheckers: