diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index eb8826e01..4c2454712 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -47,12 +47,18 @@ #: :var str _API: Constraint for the used API -_API = "MIT" +_API = None try: import gssapi - - GSS_EXCEPTIONS = (gssapi.GSSException,) + if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + # old, unmaintained python-gssapi package + _API = "MIT" # keep this for compatibility + GSS_EXCEPTIONS = (gssapi.GSSException,) + else: + _API = "PYTHON-GSSAPI-NEW" + GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError, + gssapi.raw.misc.GSSError,) except (ImportError, OSError): try: import pywintypes @@ -67,6 +73,7 @@ from paramiko.common import MSG_USERAUTH_REQUEST from paramiko.ssh_exception import SSHException +from paramiko._version import __version_info__ def GSSAuth(auth_method, gss_deleg_creds=True): @@ -77,21 +84,24 @@ def GSSAuth(auth_method, gss_deleg_creds=True): (gssapi-with-mic or gss-keyex) :param bool gss_deleg_creds: Delegate client credentials or not. We delegate credentials by default. - :return: Either an `._SSH_GSSAPI` (Unix) object or an - `_SSH_SSPI` (Windows) object + :return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix) + object or an `_SSH_SSPI` (Windows) object + :rtype: Object :raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported. :see: `RFC 4462 `_ - :note: Check for the available API and return either an `._SSH_GSSAPI` - (MIT GSSAPI) object or an `._SSH_SSPI` (MS SSPI) object. If you - get python-gssapi working on Windows, python-gssapi - will be used and a `._SSH_GSSAPI` object will be returned. + :note: Check for the available API and return either an `._SSH_GSSAPI_OLD` + (MIT GSSAPI using python-gssapi package) object, an `._SSH_GSSAPI_NEW` + (MIT GSSAPI using gssapi package) object + or an `._SSH_SSPI` (MS SSPI) object. If there is no supported API available, ``None`` will be returned. """ if _API == "MIT": - return _SSH_GSSAPI(auth_method, gss_deleg_creds) + return _SSH_GSSAPI_OLD(auth_method, gss_deleg_creds) + elif _API == "PYTHON-GSSAPI-NEW": + return _SSH_GSSAPI_NEW(auth_method, gss_deleg_creds) elif _API == "SSPI" and os.name == "nt": return _SSH_SSPI(auth_method, gss_deleg_creds) else: @@ -100,7 +110,7 @@ def GSSAuth(auth_method, gss_deleg_creds=True): class _SSH_GSSAuth(object): """ - Contains the shared variables and methods of `._SSH_GSSAPI` and + Contains the shared variables and methods of `._SSH_GSSAPI_*` and `._SSH_SSPI`. """ @@ -222,9 +232,10 @@ def _ssh_build_mic(self, session_id, username, service, auth_method): return mic -class _SSH_GSSAPI(_SSH_GSSAuth): +class _SSH_GSSAPI_OLD(_SSH_GSSAuth): """ - Implementation of the GSS-API MIT Kerberos Authentication for SSH2. + Implementation of the GSS-API MIT Kerberos Authentication for SSH2, + using the older (unmaintained) python-gssapi package. :see: `.GSSAuth` """ @@ -399,6 +410,174 @@ def save_client_creds(self, client_token): raise NotImplementedError +if __version_info__[0] == 2 and __version_info__[0] <= 4: + # provide the old name for strict backward compatibility + _SSH_GSSAPI = _SSH_GSSAPI_OLD + + +class _SSH_GSSAPI_NEW(_SSH_GSSAuth): + """ + Implementation of the GSS-API MIT Kerberos Authentication for SSH2, + using the newer, currently maintained gssapi package. + + :see: `.GSSAuth` + """ + def __init__(self, auth_method, gss_deleg_creds): + """ + :param str auth_method: The name of the SSH authentication mechanism + (gssapi-with-mic or gss-keyex) + :param bool gss_deleg_creds: Delegate client credentials or not + """ + _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds) + + if self._gss_deleg_creds: + self._gss_flags = (gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer) + else: + self._gss_flags = (gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication) + + def ssh_init_sec_context(self, target, desired_mech=None, + username=None, recv_token=None): + """ + Initialize a GSS-API context. + + :param str username: The name of the user who attempts to login + :param str target: The hostname of the target to connect to + :param str desired_mech: The negotiated GSS-API mechanism + ("pseudo negotiated" mechanism, because we + support just the krb5 mechanism :-)) + :param str recv_token: The GSS-API token received from the Server + :raise SSHException: Is raised if the desired mechanism of the client + is not supported + :raise gssapi.exceptions.GSSError: if there is an error signaled by the + GSS-API implementation + :return: A ``String`` if the GSS-API has returned a token or ``None`` if + no token was returned + :rtype: String or None + """ + self._username = username + self._gss_host = target + targ_name = gssapi.Name("host@" + self._gss_host, + name_type=gssapi.NameType.hostbased_service) + if desired_mech is not None: + mech, __ = decoder.decode(desired_mech) + if mech.__str__() != self._krb5_mech: + raise SSHException("Unsupported mechanism OID.") + krb5_mech = gssapi.MechType.kerberos + token = None + if recv_token is None: + self._gss_ctxt = gssapi.SecurityContext(name=targ_name, + flags=self._gss_flags, + mech=krb5_mech, + usage='initiate') + token = self._gss_ctxt.step(token) + else: + token = self._gss_ctxt.step(recv_token) + self._gss_ctxt_status = self._gss_ctxt.complete + return token + + def ssh_get_mic(self, session_id, gss_kex=False): + """ + Create the MIC token for a SSH2 message. + + :param str session_id: The SSH session ID + :param bool gss_kex: Generate the MIC for GSS-API Key Exchange or not + :return: gssapi-with-mic: + Returns the MIC token from GSS-API for the message we created + with ``_ssh_build_mic``. + gssapi-keyex: + Returns the MIC token from GSS-API with the SSH session ID as + message. + :rtype: String + :see: `._ssh_build_mic` + """ + self._session_id = session_id + if not gss_kex: + mic_field = self._ssh_build_mic(self._session_id, + self._username, + self._service, + self._auth_method) + mic_token = self._gss_ctxt.get_signature(mic_field) + else: + # for key exchange with gssapi-keyex + mic_token = self._gss_srv_ctxt.get_signature(self._session_id) + return mic_token + + def ssh_accept_sec_context(self, hostname, recv_token, username=None): + """ + Accept a GSS-API context (server mode). + + :param str hostname: The servers hostname + :param str username: The name of the user who attempts to login + :param str recv_token: The GSS-API Token received from the server, + if it's not the initial call. + :return: A ``String`` if the GSS-API has returned a token or ``None`` + if no token was returned + :rtype: String or None + """ + # hostname and username are not required for GSSAPI, but for SSPI + self._gss_host = hostname + self._username = username + if self._gss_srv_ctxt is None: + self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + token = self._gss_srv_ctxt.step(recv_token) + self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete + return token + + def ssh_check_mic(self, mic_token, session_id, username=None): + """ + Verify the MIC token for a SSH2 message. + + :param str mic_token: The MIC token received from the client + :param str session_id: The SSH session ID + :param str username: The name of the user who attempts to login + :return: None if the MIC check was successful + :raises gssapi.exceptions.GSSError: if the MIC check failed + """ + self._session_id = session_id + self._username = username + if self._username is not None: + # server mode + mic_field = self._ssh_build_mic(self._session_id, + self._username, + self._service, + self._auth_method) + self._gss_srv_ctxt.verify_signature(mic_field, mic_token) + else: + # for key exchange with gssapi-keyex + # client mode + self._gss_ctxt.verify_signature(self._session_id, + mic_token) + + @property + def credentials_delegated(self): + """ + Checks if credentials are delegated (server mode). + + :return: ``True`` if credentials are delegated, otherwise ``False`` + :rtype: bool + """ + if self._gss_srv_ctxt.delegated_creds is not None: + return True + return False + + def save_client_creds(self, client_token): + """ + Save the Client token in a file. This is used by the SSH server + to store the client credentials if credentials are delegated + (server mode). + + :param str client_token: The GSS-API token received form the client + :raise NotImplementedError: Credential delegation is currently not + supported in server mode + """ + raise NotImplementedError + + class _SSH_SSPI(_SSH_GSSAuth): """ Implementation of the Microsoft SSPI Kerberos Authentication for SSH2. From a20936e18c0336476f8f4976257699a52e9a996e Mon Sep 17 00:00:00 2001 From: Hugh Cole-Baker Date: Sat, 10 Dec 2016 15:31:22 +0000 Subject: [PATCH 2/8] Test the new and old Python GSSAPI packages --- tests/test_gssapi.py | 49 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 3e8c39e80..04304c0f2 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -52,9 +52,12 @@ def test_2_gssapi_sspi(self): """ Test the used methods of python-gssapi or sspi, sspicon from pywin32. """ - _API = "MIT" try: import gssapi + if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + _API = "PYTHON-GSSAPI-OLD" + else: + _API = "PYTHON-GSSAPI-NEW" except ImportError: import sspicon import sspi @@ -65,7 +68,7 @@ def test_2_gssapi_sspi(self): gss_ctxt_status = False mic_msg = b"G'day Mate!" - if _API == "MIT": + if _API == "PYTHON-GSSAPI-OLD": if self.server_mode: gss_flags = ( gssapi.C_PROT_READY_FLAG, @@ -113,6 +116,48 @@ def test_2_gssapi_sspi(self): # Check MIC status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) + elif _API == "PYTHON-GSSAPI-NEW": + if server_mode: + gss_flags = (gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer) + else: + gss_flags = (gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.delegate_to_peer) + # Initialize a GSS-API context. + krb5_oid = gssapi.MechType.kerberos + target_name = gssapi.Name("host@" + targ_name, + name_type=gssapi.NameType.hostbased_service) + gss_ctxt = gssapi.SecurityContext(name=target_name, + flags=gss_flags, + mech=krb5_oid, + usage='initiate') + if server_mode: + c_token = gss_ctxt.step(c_token) + gss_ctxt_status = gss_ctxt.complete + self.assertEquals(False, gss_ctxt_status) + # Accept a GSS-API context. + gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + s_token = gss_srv_ctxt.step(c_token) + gss_ctxt_status = gss_srv_ctxt.complete + self.assertNotEquals(None, s_token) + self.assertEquals(True, gss_ctxt_status) + # Establish the client context + c_token = gss_ctxt.step(s_token) + self.assertEquals(None, c_token) + else: + while not gss_ctxt.complete: + c_token = gss_ctxt.step(c_token) + self.assertNotEquals(None, c_token) + # Build MIC + mic_token = gss_ctxt.get_signature(mic_msg) + + if server_mode: + # Check MIC + status = gss_srv_ctxt.verify_signature(mic_msg, mic_token) + self.assertEquals(0, status) else: gss_flags = ( sspicon.ISC_REQ_INTEGRITY From db358dc149f7549c147e520bbe5c26b571d899d4 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Fri, 9 Feb 2018 23:48:47 +0100 Subject: [PATCH 3/8] Fix Sphinx and PEP 8 warnings --- paramiko/ssh_gss.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index 4c2454712..ff2fa065b 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -86,14 +86,14 @@ def GSSAuth(auth_method, gss_deleg_creds=True): We delegate credentials by default. :return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix) object or an `_SSH_SSPI` (Windows) object - :rtype: Object + :rtype: object :raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported. :see: `RFC 4462 `_ :note: Check for the available API and return either an `._SSH_GSSAPI_OLD` - (MIT GSSAPI using python-gssapi package) object, an `._SSH_GSSAPI_NEW` - (MIT GSSAPI using gssapi package) object + (MIT GSSAPI using python-gssapi package) object, an + `._SSH_GSSAPI_NEW` (MIT GSSAPI using gssapi package) object or an `._SSH_SSPI` (MS SSPI) object. If there is no supported API available, ``None`` will be returned. @@ -110,8 +110,8 @@ def GSSAuth(auth_method, gss_deleg_creds=True): class _SSH_GSSAuth(object): """ - Contains the shared variables and methods of `._SSH_GSSAPI_*` and - `._SSH_SSPI`. + Contains the shared variables and methods of `._SSH_GSSAPI_OLD`, + `._SSH_GSSAPI_NEW` and `._SSH_SSPI`. """ def __init__(self, auth_method, gss_deleg_creds): @@ -451,13 +451,12 @@ def ssh_init_sec_context(self, target, desired_mech=None, ("pseudo negotiated" mechanism, because we support just the krb5 mechanism :-)) :param str recv_token: The GSS-API token received from the Server - :raise SSHException: Is raised if the desired mechanism of the client - is not supported - :raise gssapi.exceptions.GSSError: if there is an error signaled by the - GSS-API implementation - :return: A ``String`` if the GSS-API has returned a token or ``None`` if - no token was returned - :rtype: String or None + :raises: `.SSHException` -- Is raised if the desired mechanism of the + client is not supported + :raises: ``gssapi.exceptions.GSSError`` if there is an error signaled + by the GSS-API implementation + :return: A ``String`` if the GSS-API has returned a token or ``None`` + if no token was returned """ self._username = username self._gss_host = target @@ -492,8 +491,7 @@ def ssh_get_mic(self, session_id, gss_kex=False): gssapi-keyex: Returns the MIC token from GSS-API with the SSH session ID as message. - :rtype: String - :see: `._ssh_build_mic` + :rtype: str """ self._session_id = session_id if not gss_kex: @@ -517,7 +515,6 @@ def ssh_accept_sec_context(self, hostname, recv_token, username=None): if it's not the initial call. :return: A ``String`` if the GSS-API has returned a token or ``None`` if no token was returned - :rtype: String or None """ # hostname and username are not required for GSSAPI, but for SSPI self._gss_host = hostname @@ -536,7 +533,7 @@ def ssh_check_mic(self, mic_token, session_id, username=None): :param str session_id: The SSH session ID :param str username: The name of the user who attempts to login :return: None if the MIC check was successful - :raises gssapi.exceptions.GSSError: if the MIC check failed + :raises: ``gssapi.exceptions.GSSError`` -- if the MIC check failed """ self._session_id = session_id self._username = username @@ -572,8 +569,8 @@ def save_client_creds(self, client_token): (server mode). :param str client_token: The GSS-API token received form the client - :raise NotImplementedError: Credential delegation is currently not - supported in server mode + :raises: ``NotImplementedError`` -- Credential delegation is currently + not supported in server mode """ raise NotImplementedError From a36499fd8762a19da43ee16429b148cb89f4d39f Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Fri, 5 Oct 2018 18:15:24 +0200 Subject: [PATCH 4/8] Use k5test (if available) to execute GSSAPI related tests Previously testing of GSSAPI (Kerberos) related functions required an externally provided Kerberos environment. Therefore all GSSAPI tests were skipped. Now the package k5test is used to setup a self-contained Kerberos environment. Because k5test requires the new GSSAPI, this commit also merges pull request #1166 and fixes broken GSSAPI test. If k5test is not available (i.e. on Windows), the tests still get skipped. The test case test_kex_gss.test_2_gsskex_and_auth_rekey is expected to fail. --- tests/test_gssapi.py | 41 ++++++++++++------- tests/test_kex_gss.py | 20 ++++------ tests/test_ssh_gss.py | 15 +++---- tests/util.py | 93 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 131 insertions(+), 38 deletions(-) diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 04304c0f2..98d4d14ec 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -22,20 +22,19 @@ Test the used APIs for GSS-API / SSPI authentication """ -import unittest import socket -from .util import needs_gssapi +from .util import needs_gssapi, KerberosTestCase, update_env @needs_gssapi -class GSSAPITest(unittest.TestCase): - def setup(): - # TODO: these vars should all come from os.environ or whatever the - # approved pytest method is for runtime-configuring test data. +class GSSAPITest(KerberosTestCase): + def setUp(self): + super(GSSAPITest, self).setUp() self.krb5_mech = "1.2.840.113554.1.2.2" - self.targ_name = "hostname" + self.targ_name = self.realm.hostname self.server_mode = False + update_env(self, self.realm.env) def test_1_pyasn1(self): """ @@ -48,13 +47,14 @@ def test_1_pyasn1(self): mech, __ = decoder.decode(oid) self.assertEquals(self.krb5_mech, mech.__str__()) - def test_2_gssapi_sspi(self): + def _gssapi_sspi_test(self): """ Test the used methods of python-gssapi or sspi, sspicon from pywin32. """ try: import gssapi - if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + if (hasattr(gssapi, '__title__') and + gssapi.__title__ == 'python-gssapi'): _API = "PYTHON-GSSAPI-OLD" else: _API = "PYTHON-GSSAPI-NEW" @@ -117,7 +117,7 @@ def test_2_gssapi_sspi(self): status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) elif _API == "PYTHON-GSSAPI-NEW": - if server_mode: + if self.server_mode: gss_flags = (gssapi.RequirementFlag.protection_ready, gssapi.RequirementFlag.integrity, gssapi.RequirementFlag.mutual_authentication, @@ -128,13 +128,13 @@ def test_2_gssapi_sspi(self): gssapi.RequirementFlag.delegate_to_peer) # Initialize a GSS-API context. krb5_oid = gssapi.MechType.kerberos - target_name = gssapi.Name("host@" + targ_name, - name_type=gssapi.NameType.hostbased_service) + target_name = gssapi.Name("host@" + self.targ_name, + name_type=gssapi.NameType.hostbased_service) gss_ctxt = gssapi.SecurityContext(name=target_name, flags=gss_flags, mech=krb5_oid, usage='initiate') - if server_mode: + if self.server_mode: c_token = gss_ctxt.step(c_token) gss_ctxt_status = gss_ctxt.complete self.assertEquals(False, gss_ctxt_status) @@ -154,7 +154,7 @@ def test_2_gssapi_sspi(self): # Build MIC mic_token = gss_ctxt.get_signature(mic_msg) - if server_mode: + if self.server_mode: # Check MIC status = gss_srv_ctxt.verify_signature(mic_msg, mic_token) self.assertEquals(0, status) @@ -190,3 +190,16 @@ def test_2_gssapi_sspi(self): error, token = gss_ctxt.authorize(c_token) c_token = token[0].Buffer self.assertNotEquals(0, error) + + def test_2_gssapi_sspi_client(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self._gssapi_sspi_test() + + def test_3_gssapi_sspi_server(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self.server_mode = True + self._gssapi_sspi_test() diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index c71ff91c2..e58be65d9 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import paramiko -from .util import needs_gssapi +from .util import needs_gssapi, KerberosTestCase, update_env class NullServer(paramiko.ServerInterface): @@ -59,21 +59,16 @@ def check_channel_exec_request(self, channel, command): @needs_gssapi -class GSSKexTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - +class GSSKexTest(KerberosTestCase): def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() @@ -87,7 +82,7 @@ def _run(self): self.ts = paramiko.Transport(self.socks, gss_kex=True) host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") self.ts.add_server_key(host_key) - self.ts.set_gss_host(targ_name) + self.ts.set_gss_host(self.realm.hostname) try: self.ts.load_server_moduli() except: @@ -150,6 +145,7 @@ def test_1_gsskex_and_auth(self): """ self._test_gsskex_and_auth(gss_host=None) + @unittest.expectedFailure # to be investigated def test_2_gsskex_and_auth_rekey(self): """ Verify that Paramiko can rekey. diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index b6b501528..d326f522e 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -25,11 +25,10 @@ import socket import threading -import unittest import paramiko -from .util import _support, needs_gssapi +from .util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -67,17 +66,18 @@ def check_channel_exec_request(self, channel, command): @needs_gssapi -class GSSAuthTest(unittest.TestCase): +class GSSAuthTest(KerberosTestCase): def setUp(self): # TODO: username and targ_name should come from os.environ or whatever # the approved pytest method is for runtime-configuring test data. - self.username = "krb5_principal" - self.hostname = socket.getfqdn("targ_name") + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind(("targ_name", 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() @@ -148,7 +148,8 @@ def test_1_gss_auth(self): def test_2_auth_trickledown(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic auth doesn't prevent subsequent key auth from + succeeding """ self.hostname = ( "this_host_does_not_exists_and_causes_a_GSSAPI-exception" diff --git a/tests/util.py b/tests/util.py index 4ca023743..be56b37dc 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,19 +1,20 @@ from os.path import dirname, realpath, join +import os +import sys +import unittest import pytest from paramiko.py3compat import builtins +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE def _support(filename): return join(dirname(realpath(__file__)), filename) -# TODO: consider using pytest.importorskip('gssapi') instead? We presumably -# still need CLI configurability for the Kerberos parameters, though, so can't -# JUST key off presence of GSSAPI optional dependency... -# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something. -needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test") +needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE, + reason="No GSSAPI to test") def needs_builtin(name): @@ -25,3 +26,85 @@ def needs_builtin(name): slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if (os.environ.get("K5TEST_USER_PRINC", None) and + os.environ.get("K5TEST_HOSTNAME", None) and + os.environ.get("KRB5_KTNAME", None)): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm(object): + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest('Missing extension package k5test. ' + 'Please run "pip install k5test" ' + 'to install it.') + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args)) From a8e8f9aa89c2c1fe65e4477d8d553eb5e669c927 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Fri, 5 Oct 2018 19:30:48 +0200 Subject: [PATCH 5/8] Reformatted as proposed by travis.blacken --- paramiko/ssh_gss.py | 74 +++++++++++++++++++++++++++----------------- tests/test_gssapi.py | 43 +++++++++++++++---------- tests/util.py | 28 ++++++++++++----- 3 files changed, 92 insertions(+), 53 deletions(-) diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index ff2fa065b..06aac761c 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -51,14 +51,17 @@ try: import gssapi - if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + + if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi": # old, unmaintained python-gssapi package _API = "MIT" # keep this for compatibility GSS_EXCEPTIONS = (gssapi.GSSException,) else: _API = "PYTHON-GSSAPI-NEW" - GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError, - gssapi.raw.misc.GSSError,) + GSS_EXCEPTIONS = ( + gssapi.exceptions.GeneralError, + gssapi.raw.misc.GSSError, + ) except (ImportError, OSError): try: import pywintypes @@ -422,6 +425,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): :see: `.GSSAuth` """ + def __init__(self, auth_method, gss_deleg_creds): """ :param str auth_method: The name of the SSH authentication mechanism @@ -431,17 +435,22 @@ def __init__(self, auth_method, gss_deleg_creds): _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds) if self._gss_deleg_creds: - self._gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication, - gssapi.RequirementFlag.delegate_to_peer) + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) else: - self._gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication) + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + ) - def ssh_init_sec_context(self, target, desired_mech=None, - username=None, recv_token=None): + def ssh_init_sec_context( + self, target, desired_mech=None, username=None, recv_token=None + ): """ Initialize a GSS-API context. @@ -460,8 +469,10 @@ def ssh_init_sec_context(self, target, desired_mech=None, """ self._username = username self._gss_host = target - targ_name = gssapi.Name("host@" + self._gss_host, - name_type=gssapi.NameType.hostbased_service) + targ_name = gssapi.Name( + "host@" + self._gss_host, + name_type=gssapi.NameType.hostbased_service, + ) if desired_mech is not None: mech, __ = decoder.decode(desired_mech) if mech.__str__() != self._krb5_mech: @@ -469,10 +480,12 @@ def ssh_init_sec_context(self, target, desired_mech=None, krb5_mech = gssapi.MechType.kerberos token = None if recv_token is None: - self._gss_ctxt = gssapi.SecurityContext(name=targ_name, - flags=self._gss_flags, - mech=krb5_mech, - usage='initiate') + self._gss_ctxt = gssapi.SecurityContext( + name=targ_name, + flags=self._gss_flags, + mech=krb5_mech, + usage="initiate", + ) token = self._gss_ctxt.step(token) else: token = self._gss_ctxt.step(recv_token) @@ -495,10 +508,12 @@ def ssh_get_mic(self, session_id, gss_kex=False): """ self._session_id = session_id if not gss_kex: - mic_field = self._ssh_build_mic(self._session_id, - self._username, - self._service, - self._auth_method) + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) mic_token = self._gss_ctxt.get_signature(mic_field) else: # for key exchange with gssapi-keyex @@ -520,7 +535,7 @@ def ssh_accept_sec_context(self, hostname, recv_token, username=None): self._gss_host = hostname self._username = username if self._gss_srv_ctxt is None: - self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept") token = self._gss_srv_ctxt.step(recv_token) self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete return token @@ -539,16 +554,17 @@ def ssh_check_mic(self, mic_token, session_id, username=None): self._username = username if self._username is not None: # server mode - mic_field = self._ssh_build_mic(self._session_id, - self._username, - self._service, - self._auth_method) + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) self._gss_srv_ctxt.verify_signature(mic_field, mic_token) else: # for key exchange with gssapi-keyex # client mode - self._gss_ctxt.verify_signature(self._session_id, - mic_token) + self._gss_ctxt.verify_signature(self._session_id, mic_token) @property def credentials_delegated(self): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 98d4d14ec..8e6ec37ab 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -53,8 +53,11 @@ def _gssapi_sspi_test(self): """ try: import gssapi - if (hasattr(gssapi, '__title__') and - gssapi.__title__ == 'python-gssapi'): + + if ( + hasattr(gssapi, "__title__") + and gssapi.__title__ == "python-gssapi" + ): _API = "PYTHON-GSSAPI-OLD" else: _API = "PYTHON-GSSAPI-NEW" @@ -118,28 +121,36 @@ def _gssapi_sspi_test(self): self.assertEquals(0, status) elif _API == "PYTHON-GSSAPI-NEW": if self.server_mode: - gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication, - gssapi.RequirementFlag.delegate_to_peer) + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) else: - gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.delegate_to_peer) + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.delegate_to_peer, + ) # Initialize a GSS-API context. krb5_oid = gssapi.MechType.kerberos - target_name = gssapi.Name("host@" + self.targ_name, - name_type=gssapi.NameType.hostbased_service) - gss_ctxt = gssapi.SecurityContext(name=target_name, - flags=gss_flags, - mech=krb5_oid, - usage='initiate') + target_name = gssapi.Name( + "host@" + self.targ_name, + name_type=gssapi.NameType.hostbased_service, + ) + gss_ctxt = gssapi.SecurityContext( + name=target_name, + flags=gss_flags, + mech=krb5_oid, + usage="initiate", + ) if self.server_mode: c_token = gss_ctxt.step(c_token) gss_ctxt_status = gss_ctxt.complete self.assertEquals(False, gss_ctxt_status) # Accept a GSS-API context. - gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + gss_srv_ctxt = gssapi.SecurityContext(usage="accept") s_token = gss_srv_ctxt.step(c_token) gss_ctxt_status = gss_srv_ctxt.complete self.assertNotEquals(None, s_token) diff --git a/tests/util.py b/tests/util.py index be56b37dc..cdc835c95 100644 --- a/tests/util.py +++ b/tests/util.py @@ -13,8 +13,9 @@ def _support(filename): return join(dirname(realpath(__file__)), filename) -needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE, - reason="No GSSAPI to test") +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) def needs_builtin(name): @@ -41,9 +42,11 @@ def needs_builtin(name): # # ToDo: add a Windows specific implementation? -if (os.environ.get("K5TEST_USER_PRINC", None) and - os.environ.get("K5TEST_HOSTNAME", None) and - os.environ.get("KRB5_KTNAME", None)): # add other vars as needed +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed # The environment provides the required information class DummyK5Realm(object): @@ -62,6 +65,8 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): del cls.realm + + else: try: # Try to setup a kerberos environment @@ -71,22 +76,28 @@ def tearDownClass(cls): class KerberosTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - raise unittest.SkipTest('Missing extension package k5test. ' - 'Please run "pip install k5test" ' - 'to install it.') + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + def update_env(testcase, mapping, env=os.environ): """Modify os.environ during a test case and restore during cleanup.""" saved_env = env.copy() + def replace(target, source): target.update(source) for k in list(target): if k not in source: target.pop(k, None) + testcase.addCleanup(replace, env, saved_env) env.update(mapping) return testcase + def k5shell(args=None): """Create a shell with an kerberos environment @@ -97,6 +108,7 @@ def k5shell(args=None): import k5test import atexit import subprocess + k5 = k5test.K5Realm() atexit.register(k5.stop) os.environ.update(k5.env) From 1694e6e46032c63ab9e1015adedda0cf1cc14912 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Sat, 6 Oct 2018 16:09:33 +0200 Subject: [PATCH 6/8] fix GSSAPI tests for Python3 (trivial) --- tests/test_kex_gss.py | 2 +- tests/test_ssh_gss.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index e58be65d9..7e53795f1 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -53,7 +53,7 @@ def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index d326f522e..8e4cb962f 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -60,7 +60,7 @@ def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True From f3af0b3e697adc8902039b21fde93871048160e4 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Mon, 8 Oct 2018 09:17:50 +0200 Subject: [PATCH 8/8] Update a comment --- tests/test_kex_gss.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index 7e53795f1..6bac01201 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -145,7 +145,7 @@ def test_1_gsskex_and_auth(self): """ self._test_gsskex_and_auth(gss_host=None) - @unittest.expectedFailure # to be investigated + @unittest.expectedFailure # to be investigated, see https://github.com/paramiko/paramiko/issues/1312 def test_2_gsskex_and_auth_rekey(self): """ Verify that Paramiko can rekey.