python-paramiko/1311.patch
Matej Cepl a1cab7a938 Accepting request 676196 from home:mimi_vx:branches:devel:languages:python
- drop python-pytest_relaxed dependency
- add patches:
   1311.patch - fix warnings
   1379.patch - fix support for gssapi
   relaxed.patch - remove unnecessary pytest_relaxed dep
- remove patch:
   disable-gssapi.patch - supersseded

OBS-URL: https://build.opensuse.org/request/show/676196
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-paramiko?expand=0&rev=81
2019-02-14 19:17:12 +00:00

1162 lines
45 KiB
Diff

diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index eb8826e01..4c2454712 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -47,12 +47,18 @@
#: :var str _API: Constraint for the used API
-_API = "MIT"
+_API = None
try:
import gssapi
-
- GSS_EXCEPTIONS = (gssapi.GSSException,)
+ if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+ # old, unmaintained python-gssapi package
+ _API = "MIT" # keep this for compatibility
+ GSS_EXCEPTIONS = (gssapi.GSSException,)
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
+ GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError,
+ gssapi.raw.misc.GSSError,)
except (ImportError, OSError):
try:
import pywintypes
@@ -67,6 +73,7 @@
from paramiko.common import MSG_USERAUTH_REQUEST
from paramiko.ssh_exception import SSHException
+from paramiko._version import __version_info__
def GSSAuth(auth_method, gss_deleg_creds=True):
@@ -77,21 +84,24 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
(gssapi-with-mic or gss-keyex)
:param bool gss_deleg_creds: Delegate client credentials or not.
We delegate credentials by default.
- :return: Either an `._SSH_GSSAPI` (Unix) object or an
- `_SSH_SSPI` (Windows) object
+ :return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix)
+ object or an `_SSH_SSPI` (Windows) object
+ :rtype: Object
:raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported.
:see: `RFC 4462 <http://www.ietf.org/rfc/rfc4462.txt>`_
- :note: Check for the available API and return either an `._SSH_GSSAPI`
- (MIT GSSAPI) object or an `._SSH_SSPI` (MS SSPI) object. If you
- get python-gssapi working on Windows, python-gssapi
- will be used and a `._SSH_GSSAPI` object will be returned.
+ :note: Check for the available API and return either an `._SSH_GSSAPI_OLD`
+ (MIT GSSAPI using python-gssapi package) object, an `._SSH_GSSAPI_NEW`
+ (MIT GSSAPI using gssapi package) object
+ or an `._SSH_SSPI` (MS SSPI) object.
If there is no supported API available,
``None`` will be returned.
"""
if _API == "MIT":
- return _SSH_GSSAPI(auth_method, gss_deleg_creds)
+ return _SSH_GSSAPI_OLD(auth_method, gss_deleg_creds)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ return _SSH_GSSAPI_NEW(auth_method, gss_deleg_creds)
elif _API == "SSPI" and os.name == "nt":
return _SSH_SSPI(auth_method, gss_deleg_creds)
else:
@@ -100,7 +110,7 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
class _SSH_GSSAuth(object):
"""
- Contains the shared variables and methods of `._SSH_GSSAPI` and
+ Contains the shared variables and methods of `._SSH_GSSAPI_*` and
`._SSH_SSPI`.
"""
@@ -222,9 +232,10 @@ def _ssh_build_mic(self, session_id, username, service, auth_method):
return mic
-class _SSH_GSSAPI(_SSH_GSSAuth):
+class _SSH_GSSAPI_OLD(_SSH_GSSAuth):
"""
- Implementation of the GSS-API MIT Kerberos Authentication for SSH2.
+ Implementation of the GSS-API MIT Kerberos Authentication for SSH2,
+ using the older (unmaintained) python-gssapi package.
:see: `.GSSAuth`
"""
@@ -399,6 +410,174 @@ def save_client_creds(self, client_token):
raise NotImplementedError
+if __version_info__[0] == 2 and __version_info__[0] <= 4:
+ # provide the old name for strict backward compatibility
+ _SSH_GSSAPI = _SSH_GSSAPI_OLD
+
+
+class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
+ """
+ Implementation of the GSS-API MIT Kerberos Authentication for SSH2,
+ using the newer, currently maintained gssapi package.
+
+ :see: `.GSSAuth`
+ """
+ def __init__(self, auth_method, gss_deleg_creds):
+ """
+ :param str auth_method: The name of the SSH authentication mechanism
+ (gssapi-with-mic or gss-keyex)
+ :param bool gss_deleg_creds: Delegate client credentials or not
+ """
+ _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds)
+
+ if self._gss_deleg_creds:
+ self._gss_flags = (gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer)
+ else:
+ self._gss_flags = (gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication)
+
+ def ssh_init_sec_context(self, target, desired_mech=None,
+ username=None, recv_token=None):
+ """
+ Initialize a GSS-API context.
+
+ :param str username: The name of the user who attempts to login
+ :param str target: The hostname of the target to connect to
+ :param str desired_mech: The negotiated GSS-API mechanism
+ ("pseudo negotiated" mechanism, because we
+ support just the krb5 mechanism :-))
+ :param str recv_token: The GSS-API token received from the Server
+ :raise SSHException: Is raised if the desired mechanism of the client
+ is not supported
+ :raise gssapi.exceptions.GSSError: if there is an error signaled by the
+ GSS-API implementation
+ :return: A ``String`` if the GSS-API has returned a token or ``None`` if
+ no token was returned
+ :rtype: String or None
+ """
+ self._username = username
+ self._gss_host = target
+ targ_name = gssapi.Name("host@" + self._gss_host,
+ name_type=gssapi.NameType.hostbased_service)
+ if desired_mech is not None:
+ mech, __ = decoder.decode(desired_mech)
+ if mech.__str__() != self._krb5_mech:
+ raise SSHException("Unsupported mechanism OID.")
+ krb5_mech = gssapi.MechType.kerberos
+ token = None
+ if recv_token is None:
+ self._gss_ctxt = gssapi.SecurityContext(name=targ_name,
+ flags=self._gss_flags,
+ mech=krb5_mech,
+ usage='initiate')
+ token = self._gss_ctxt.step(token)
+ else:
+ token = self._gss_ctxt.step(recv_token)
+ self._gss_ctxt_status = self._gss_ctxt.complete
+ return token
+
+ def ssh_get_mic(self, session_id, gss_kex=False):
+ """
+ Create the MIC token for a SSH2 message.
+
+ :param str session_id: The SSH session ID
+ :param bool gss_kex: Generate the MIC for GSS-API Key Exchange or not
+ :return: gssapi-with-mic:
+ Returns the MIC token from GSS-API for the message we created
+ with ``_ssh_build_mic``.
+ gssapi-keyex:
+ Returns the MIC token from GSS-API with the SSH session ID as
+ message.
+ :rtype: String
+ :see: `._ssh_build_mic`
+ """
+ self._session_id = session_id
+ if not gss_kex:
+ mic_field = self._ssh_build_mic(self._session_id,
+ self._username,
+ self._service,
+ self._auth_method)
+ mic_token = self._gss_ctxt.get_signature(mic_field)
+ else:
+ # for key exchange with gssapi-keyex
+ mic_token = self._gss_srv_ctxt.get_signature(self._session_id)
+ return mic_token
+
+ def ssh_accept_sec_context(self, hostname, recv_token, username=None):
+ """
+ Accept a GSS-API context (server mode).
+
+ :param str hostname: The servers hostname
+ :param str username: The name of the user who attempts to login
+ :param str recv_token: The GSS-API Token received from the server,
+ if it's not the initial call.
+ :return: A ``String`` if the GSS-API has returned a token or ``None``
+ if no token was returned
+ :rtype: String or None
+ """
+ # hostname and username are not required for GSSAPI, but for SSPI
+ self._gss_host = hostname
+ self._username = username
+ if self._gss_srv_ctxt is None:
+ self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ token = self._gss_srv_ctxt.step(recv_token)
+ self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete
+ return token
+
+ def ssh_check_mic(self, mic_token, session_id, username=None):
+ """
+ Verify the MIC token for a SSH2 message.
+
+ :param str mic_token: The MIC token received from the client
+ :param str session_id: The SSH session ID
+ :param str username: The name of the user who attempts to login
+ :return: None if the MIC check was successful
+ :raises gssapi.exceptions.GSSError: if the MIC check failed
+ """
+ self._session_id = session_id
+ self._username = username
+ if self._username is not None:
+ # server mode
+ mic_field = self._ssh_build_mic(self._session_id,
+ self._username,
+ self._service,
+ self._auth_method)
+ self._gss_srv_ctxt.verify_signature(mic_field, mic_token)
+ else:
+ # for key exchange with gssapi-keyex
+ # client mode
+ self._gss_ctxt.verify_signature(self._session_id,
+ mic_token)
+
+ @property
+ def credentials_delegated(self):
+ """
+ Checks if credentials are delegated (server mode).
+
+ :return: ``True`` if credentials are delegated, otherwise ``False``
+ :rtype: bool
+ """
+ if self._gss_srv_ctxt.delegated_creds is not None:
+ return True
+ return False
+
+ def save_client_creds(self, client_token):
+ """
+ Save the Client token in a file. This is used by the SSH server
+ to store the client credentials if credentials are delegated
+ (server mode).
+
+ :param str client_token: The GSS-API token received form the client
+ :raise NotImplementedError: Credential delegation is currently not
+ supported in server mode
+ """
+ raise NotImplementedError
+
+
class _SSH_SSPI(_SSH_GSSAuth):
"""
Implementation of the Microsoft SSPI Kerberos Authentication for SSH2.
From a20936e18c0336476f8f4976257699a52e9a996e Mon Sep 17 00:00:00 2001
From: Hugh Cole-Baker <sigmaris@gmail.com>
Date: Sat, 10 Dec 2016 15:31:22 +0000
Subject: [PATCH 2/8] Test the new and old Python GSSAPI packages
---
tests/test_gssapi.py | 49 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 47 insertions(+), 2 deletions(-)
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 3e8c39e80..04304c0f2 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -52,9 +52,12 @@ def test_2_gssapi_sspi(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
- _API = "MIT"
try:
import gssapi
+ if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+ _API = "PYTHON-GSSAPI-OLD"
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
except ImportError:
import sspicon
import sspi
@@ -65,7 +68,7 @@ def test_2_gssapi_sspi(self):
gss_ctxt_status = False
mic_msg = b"G'day Mate!"
- if _API == "MIT":
+ if _API == "PYTHON-GSSAPI-OLD":
if self.server_mode:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
@@ -113,6 +116,48 @@ def test_2_gssapi_sspi(self):
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ if server_mode:
+ gss_flags = (gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer)
+ else:
+ gss_flags = (gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer)
+ # Initialize a GSS-API context.
+ krb5_oid = gssapi.MechType.kerberos
+ target_name = gssapi.Name("host@" + targ_name,
+ name_type=gssapi.NameType.hostbased_service)
+ gss_ctxt = gssapi.SecurityContext(name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage='initiate')
+ if server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.complete
+ self.assertEquals(False, gss_ctxt_status)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.complete
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ # Establish the client context
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.complete:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ # Build MIC
+ mic_token = gss_ctxt.get_signature(mic_msg)
+
+ if server_mode:
+ # Check MIC
+ status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
+ self.assertEquals(0, status)
else:
gss_flags = (
sspicon.ISC_REQ_INTEGRITY
From db358dc149f7549c147e520bbe5c26b571d899d4 Mon Sep 17 00:00:00 2001
From: Anselm Kruis <a.kruis@science-computing.de>
Date: Fri, 9 Feb 2018 23:48:47 +0100
Subject: [PATCH 3/8] Fix Sphinx and PEP 8 warnings
---
paramiko/ssh_gss.py | 33 +++++++++++++++------------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index 4c2454712..ff2fa065b 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -86,14 +86,14 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
We delegate credentials by default.
:return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix)
object or an `_SSH_SSPI` (Windows) object
- :rtype: Object
+ :rtype: object
:raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported.
:see: `RFC 4462 <http://www.ietf.org/rfc/rfc4462.txt>`_
:note: Check for the available API and return either an `._SSH_GSSAPI_OLD`
- (MIT GSSAPI using python-gssapi package) object, an `._SSH_GSSAPI_NEW`
- (MIT GSSAPI using gssapi package) object
+ (MIT GSSAPI using python-gssapi package) object, an
+ `._SSH_GSSAPI_NEW` (MIT GSSAPI using gssapi package) object
or an `._SSH_SSPI` (MS SSPI) object.
If there is no supported API available,
``None`` will be returned.
@@ -110,8 +110,8 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
class _SSH_GSSAuth(object):
"""
- Contains the shared variables and methods of `._SSH_GSSAPI_*` and
- `._SSH_SSPI`.
+ Contains the shared variables and methods of `._SSH_GSSAPI_OLD`,
+ `._SSH_GSSAPI_NEW` and `._SSH_SSPI`.
"""
def __init__(self, auth_method, gss_deleg_creds):
@@ -451,13 +451,12 @@ def ssh_init_sec_context(self, target, desired_mech=None,
("pseudo negotiated" mechanism, because we
support just the krb5 mechanism :-))
:param str recv_token: The GSS-API token received from the Server
- :raise SSHException: Is raised if the desired mechanism of the client
- is not supported
- :raise gssapi.exceptions.GSSError: if there is an error signaled by the
- GSS-API implementation
- :return: A ``String`` if the GSS-API has returned a token or ``None`` if
- no token was returned
- :rtype: String or None
+ :raises: `.SSHException` -- Is raised if the desired mechanism of the
+ client is not supported
+ :raises: ``gssapi.exceptions.GSSError`` if there is an error signaled
+ by the GSS-API implementation
+ :return: A ``String`` if the GSS-API has returned a token or ``None``
+ if no token was returned
"""
self._username = username
self._gss_host = target
@@ -492,8 +491,7 @@ def ssh_get_mic(self, session_id, gss_kex=False):
gssapi-keyex:
Returns the MIC token from GSS-API with the SSH session ID as
message.
- :rtype: String
- :see: `._ssh_build_mic`
+ :rtype: str
"""
self._session_id = session_id
if not gss_kex:
@@ -517,7 +515,6 @@ def ssh_accept_sec_context(self, hostname, recv_token, username=None):
if it's not the initial call.
:return: A ``String`` if the GSS-API has returned a token or ``None``
if no token was returned
- :rtype: String or None
"""
# hostname and username are not required for GSSAPI, but for SSPI
self._gss_host = hostname
@@ -536,7 +533,7 @@ def ssh_check_mic(self, mic_token, session_id, username=None):
:param str session_id: The SSH session ID
:param str username: The name of the user who attempts to login
:return: None if the MIC check was successful
- :raises gssapi.exceptions.GSSError: if the MIC check failed
+ :raises: ``gssapi.exceptions.GSSError`` -- if the MIC check failed
"""
self._session_id = session_id
self._username = username
@@ -572,8 +569,8 @@ def save_client_creds(self, client_token):
(server mode).
:param str client_token: The GSS-API token received form the client
- :raise NotImplementedError: Credential delegation is currently not
- supported in server mode
+ :raises: ``NotImplementedError`` -- Credential delegation is currently
+ not supported in server mode
"""
raise NotImplementedError
From a36499fd8762a19da43ee16429b148cb89f4d39f Mon Sep 17 00:00:00 2001
From: Anselm Kruis <Anselm.Kruis@atos.net>
Date: Fri, 5 Oct 2018 18:15:24 +0200
Subject: [PATCH 4/8] Use k5test (if available) to execute GSSAPI related tests
Previously testing of GSSAPI (Kerberos) related functions required an
externally provided Kerberos environment. Therefore all GSSAPI tests were
skipped.
Now the package k5test is used to setup a self-contained Kerberos environment.
Because k5test requires the new GSSAPI, this commit also merges
pull request #1166 and fixes broken GSSAPI test. If k5test is not available
(i.e. on Windows), the tests still get skipped.
The test case test_kex_gss.test_2_gsskex_and_auth_rekey is expected to fail.
---
tests/test_gssapi.py | 41 ++++++++++++-------
tests/test_kex_gss.py | 20 ++++------
tests/test_ssh_gss.py | 15 +++----
tests/util.py | 93 ++++++++++++++++++++++++++++++++++++++++---
4 files changed, 131 insertions(+), 38 deletions(-)
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 04304c0f2..98d4d14ec 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,20 +22,19 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup():
- # TODO: these vars should all come from os.environ or whatever the
- # approved pytest method is for runtime-configuring test data.
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
def test_1_pyasn1(self):
"""
@@ -48,13 +47,14 @@ def test_1_pyasn1(self):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_2_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
try:
import gssapi
- if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+ if (hasattr(gssapi, '__title__') and
+ gssapi.__title__ == 'python-gssapi'):
_API = "PYTHON-GSSAPI-OLD"
else:
_API = "PYTHON-GSSAPI-NEW"
@@ -117,7 +117,7 @@ def test_2_gssapi_sspi(self):
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
- if server_mode:
+ if self.server_mode:
gss_flags = (gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
@@ -128,13 +128,13 @@ def test_2_gssapi_sspi(self):
gssapi.RequirementFlag.delegate_to_peer)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
- target_name = gssapi.Name("host@" + targ_name,
- name_type=gssapi.NameType.hostbased_service)
+ target_name = gssapi.Name("host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service)
gss_ctxt = gssapi.SecurityContext(name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage='initiate')
- if server_mode:
+ if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
@@ -154,7 +154,7 @@ def test_2_gssapi_sspi(self):
# Build MIC
mic_token = gss_ctxt.get_signature(mic_msg)
- if server_mode:
+ if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
self.assertEquals(0, status)
@@ -190,3 +190,16 @@ def test_2_gssapi_sspi(self):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index c71ff91c2..e58be65d9 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -59,21 +59,16 @@ def check_channel_exec_request(self, channel, command):
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ def _run(self):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -150,6 +145,7 @@ def test_1_gsskex_and_auth(self):
"""
self._test_gsskex_and_auth(gss_host=None)
+ @unittest.expectedFailure # to be investigated
def test_2_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index b6b501528..d326f522e 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -67,17 +66,18 @@ def check_channel_exec_request(self, channel, command):
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -148,7 +148,8 @@ def test_1_gss_auth(self):
def test_2_auth_trickledown(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from
+ succeeding
"""
self.hostname = (
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
diff --git a/tests/util.py b/tests/util.py
index 4ca023743..be56b37dc 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,20 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
def _support(filename):
return join(dirname(realpath(__file__)), filename)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE,
+ reason="No GSSAPI to test")
def needs_builtin(name):
@@ -25,3 +26,85 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (os.environ.get("K5TEST_USER_PRINC", None) and
+ os.environ.get("K5TEST_HOSTNAME", None) and
+ os.environ.get("KRB5_KTNAME", None)): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest('Missing extension package k5test. '
+ 'Please run "pip install k5test" '
+ 'to install it.')
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))
From a8e8f9aa89c2c1fe65e4477d8d553eb5e669c927 Mon Sep 17 00:00:00 2001
From: Anselm Kruis <Anselm.Kruis@atos.net>
Date: Fri, 5 Oct 2018 19:30:48 +0200
Subject: [PATCH 5/8] Reformatted as proposed by travis.blacken
---
paramiko/ssh_gss.py | 74 +++++++++++++++++++++++++++-----------------
tests/test_gssapi.py | 43 +++++++++++++++----------
tests/util.py | 28 ++++++++++++-----
3 files changed, 92 insertions(+), 53 deletions(-)
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index ff2fa065b..06aac761c 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -51,14 +51,17 @@
try:
import gssapi
- if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+
+ if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi":
# old, unmaintained python-gssapi package
_API = "MIT" # keep this for compatibility
GSS_EXCEPTIONS = (gssapi.GSSException,)
else:
_API = "PYTHON-GSSAPI-NEW"
- GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError,
- gssapi.raw.misc.GSSError,)
+ GSS_EXCEPTIONS = (
+ gssapi.exceptions.GeneralError,
+ gssapi.raw.misc.GSSError,
+ )
except (ImportError, OSError):
try:
import pywintypes
@@ -422,6 +425,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
:see: `.GSSAuth`
"""
+
def __init__(self, auth_method, gss_deleg_creds):
"""
:param str auth_method: The name of the SSH authentication mechanism
@@ -431,17 +435,22 @@ def __init__(self, auth_method, gss_deleg_creds):
_SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds)
if self._gss_deleg_creds:
- self._gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication,
- gssapi.RequirementFlag.delegate_to_peer)
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
else:
- self._gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication)
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ )
- def ssh_init_sec_context(self, target, desired_mech=None,
- username=None, recv_token=None):
+ def ssh_init_sec_context(
+ self, target, desired_mech=None, username=None, recv_token=None
+ ):
"""
Initialize a GSS-API context.
@@ -460,8 +469,10 @@ def ssh_init_sec_context(self, target, desired_mech=None,
"""
self._username = username
self._gss_host = target
- targ_name = gssapi.Name("host@" + self._gss_host,
- name_type=gssapi.NameType.hostbased_service)
+ targ_name = gssapi.Name(
+ "host@" + self._gss_host,
+ name_type=gssapi.NameType.hostbased_service,
+ )
if desired_mech is not None:
mech, __ = decoder.decode(desired_mech)
if mech.__str__() != self._krb5_mech:
@@ -469,10 +480,12 @@ def ssh_init_sec_context(self, target, desired_mech=None,
krb5_mech = gssapi.MechType.kerberos
token = None
if recv_token is None:
- self._gss_ctxt = gssapi.SecurityContext(name=targ_name,
- flags=self._gss_flags,
- mech=krb5_mech,
- usage='initiate')
+ self._gss_ctxt = gssapi.SecurityContext(
+ name=targ_name,
+ flags=self._gss_flags,
+ mech=krb5_mech,
+ usage="initiate",
+ )
token = self._gss_ctxt.step(token)
else:
token = self._gss_ctxt.step(recv_token)
@@ -495,10 +508,12 @@ def ssh_get_mic(self, session_id, gss_kex=False):
"""
self._session_id = session_id
if not gss_kex:
- mic_field = self._ssh_build_mic(self._session_id,
- self._username,
- self._service,
- self._auth_method)
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
mic_token = self._gss_ctxt.get_signature(mic_field)
else:
# for key exchange with gssapi-keyex
@@ -520,7 +535,7 @@ def ssh_accept_sec_context(self, hostname, recv_token, username=None):
self._gss_host = hostname
self._username = username
if self._gss_srv_ctxt is None:
- self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
token = self._gss_srv_ctxt.step(recv_token)
self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete
return token
@@ -539,16 +554,17 @@ def ssh_check_mic(self, mic_token, session_id, username=None):
self._username = username
if self._username is not None:
# server mode
- mic_field = self._ssh_build_mic(self._session_id,
- self._username,
- self._service,
- self._auth_method)
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
self._gss_srv_ctxt.verify_signature(mic_field, mic_token)
else:
# for key exchange with gssapi-keyex
# client mode
- self._gss_ctxt.verify_signature(self._session_id,
- mic_token)
+ self._gss_ctxt.verify_signature(self._session_id, mic_token)
@property
def credentials_delegated(self):
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 98d4d14ec..8e6ec37ab 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -53,8 +53,11 @@ def _gssapi_sspi_test(self):
"""
try:
import gssapi
- if (hasattr(gssapi, '__title__') and
- gssapi.__title__ == 'python-gssapi'):
+
+ if (
+ hasattr(gssapi, "__title__")
+ and gssapi.__title__ == "python-gssapi"
+ ):
_API = "PYTHON-GSSAPI-OLD"
else:
_API = "PYTHON-GSSAPI-NEW"
@@ -118,28 +121,36 @@ def _gssapi_sspi_test(self):
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
if self.server_mode:
- gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication,
- gssapi.RequirementFlag.delegate_to_peer)
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
else:
- gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.delegate_to_peer)
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
- target_name = gssapi.Name("host@" + self.targ_name,
- name_type=gssapi.NameType.hostbased_service)
- gss_ctxt = gssapi.SecurityContext(name=target_name,
- flags=gss_flags,
- mech=krb5_oid,
- usage='initiate')
+ target_name = gssapi.Name(
+ "host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ gss_ctxt = gssapi.SecurityContext(
+ name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage="initiate",
+ )
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
- gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.complete
self.assertNotEquals(None, s_token)
diff --git a/tests/util.py b/tests/util.py
index be56b37dc..cdc835c95 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -13,8 +13,9 @@ def _support(filename):
return join(dirname(realpath(__file__)), filename)
-needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE,
- reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
def needs_builtin(name):
@@ -41,9 +42,11 @@ def needs_builtin(name):
#
# ToDo: add a Windows specific implementation?
-if (os.environ.get("K5TEST_USER_PRINC", None) and
- os.environ.get("K5TEST_HOSTNAME", None) and
- os.environ.get("KRB5_KTNAME", None)): # add other vars as needed
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
# The environment provides the required information
class DummyK5Realm(object):
@@ -62,6 +65,8 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
del cls.realm
+
+
else:
try:
# Try to setup a kerberos environment
@@ -71,22 +76,28 @@ def tearDownClass(cls):
class KerberosTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- raise unittest.SkipTest('Missing extension package k5test. '
- 'Please run "pip install k5test" '
- 'to install it.')
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
def update_env(testcase, mapping, env=os.environ):
"""Modify os.environ during a test case and restore during cleanup."""
saved_env = env.copy()
+
def replace(target, source):
target.update(source)
for k in list(target):
if k not in source:
target.pop(k, None)
+
testcase.addCleanup(replace, env, saved_env)
env.update(mapping)
return testcase
+
def k5shell(args=None):
"""Create a shell with an kerberos environment
@@ -97,6 +108,7 @@ def k5shell(args=None):
import k5test
import atexit
import subprocess
+
k5 = k5test.K5Realm()
atexit.register(k5.stop)
os.environ.update(k5.env)
From 1694e6e46032c63ab9e1015adedda0cf1cc14912 Mon Sep 17 00:00:00 2001
From: Anselm Kruis <anselm.kruis@atos.net>
Date: Sat, 6 Oct 2018 16:09:33 +0200
Subject: [PATCH 6/8] fix GSSAPI tests for Python3 (trivial)
---
tests/test_kex_gss.py | 2 +-
tests/test_ssh_gss.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index e58be65d9..7e53795f1 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -53,7 +53,7 @@ def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d326f522e..8e4cb962f 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -60,7 +60,7 @@ def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
From f3af0b3e697adc8902039b21fde93871048160e4 Mon Sep 17 00:00:00 2001
From: Anselm Kruis <Anselm.Kruis@atos.net>
Date: Mon, 8 Oct 2018 09:17:50 +0200
Subject: [PATCH 8/8] Update a comment
---
tests/test_kex_gss.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 7e53795f1..6bac01201 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -145,7 +145,7 @@ def test_1_gsskex_and_auth(self):
"""
self._test_gsskex_and_auth(gss_host=None)
- @unittest.expectedFailure # to be investigated
+ @unittest.expectedFailure # to be investigated, see https://github.com/paramiko/paramiko/issues/1312
def test_2_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.