Index: paramiko-2.5.0/paramiko/ssh_gss.py =================================================================== --- paramiko-2.5.0.orig/paramiko/ssh_gss.py +++ paramiko-2.5.0/paramiko/ssh_gss.py @@ -43,12 +43,21 @@ GSS_EXCEPTIONS = () #: :var str _API: Constraint for the used API -_API = "MIT" +_API = None try: import gssapi - GSS_EXCEPTIONS = (gssapi.GSSException,) + if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi": + # old, unmaintained python-gssapi package + _API = "MIT" # keep this for compatibility + GSS_EXCEPTIONS = (gssapi.GSSException,) + else: + _API = "PYTHON-GSSAPI-NEW" + GSS_EXCEPTIONS = ( + gssapi.exceptions.GeneralError, + gssapi.raw.misc.GSSError, + ) except (ImportError, OSError): try: import pywintypes @@ -63,6 +72,7 @@ except (ImportError, OSError): from paramiko.common import MSG_USERAUTH_REQUEST from paramiko.ssh_exception import SSHException +from paramiko._version import __version_info__ def GSSAuth(auth_method, gss_deleg_creds=True): @@ -73,21 +83,24 @@ def GSSAuth(auth_method, gss_deleg_creds (gssapi-with-mic or gss-keyex) :param bool gss_deleg_creds: Delegate client credentials or not. We delegate credentials by default. - :return: Either an `._SSH_GSSAPI` (Unix) object or an - `_SSH_SSPI` (Windows) object + :return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix) + object or an `_SSH_SSPI` (Windows) object + :rtype: object :raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported. :see: `RFC 4462 `_ - :note: Check for the available API and return either an `._SSH_GSSAPI` - (MIT GSSAPI) object or an `._SSH_SSPI` (MS SSPI) object. If you - get python-gssapi working on Windows, python-gssapi - will be used and a `._SSH_GSSAPI` object will be returned. + :note: Check for the available API and return either an `._SSH_GSSAPI_OLD` + (MIT GSSAPI using python-gssapi package) object, an + `._SSH_GSSAPI_NEW` (MIT GSSAPI using gssapi package) object + or an `._SSH_SSPI` (MS SSPI) object. If there is no supported API available, ``None`` will be returned. """ if _API == "MIT": - return _SSH_GSSAPI(auth_method, gss_deleg_creds) + return _SSH_GSSAPI_OLD(auth_method, gss_deleg_creds) + elif _API == "PYTHON-GSSAPI-NEW": + return _SSH_GSSAPI_NEW(auth_method, gss_deleg_creds) elif _API == "SSPI" and os.name == "nt": return _SSH_SSPI(auth_method, gss_deleg_creds) else: @@ -96,8 +109,8 @@ def GSSAuth(auth_method, gss_deleg_creds class _SSH_GSSAuth(object): """ - Contains the shared variables and methods of `._SSH_GSSAPI` and - `._SSH_SSPI`. + Contains the shared variables and methods of `._SSH_GSSAPI_OLD`, + `._SSH_GSSAPI_NEW` and `._SSH_SSPI`. """ def __init__(self, auth_method, gss_deleg_creds): @@ -223,9 +236,10 @@ class _SSH_GSSAuth(object): return mic -class _SSH_GSSAPI(_SSH_GSSAuth): +class _SSH_GSSAPI_OLD(_SSH_GSSAuth): """ - Implementation of the GSS-API MIT Kerberos Authentication for SSH2. + Implementation of the GSS-API MIT Kerberos Authentication for SSH2, + using the older (unmaintained) python-gssapi package. :see: `.GSSAuth` """ @@ -401,6 +415,184 @@ class _SSH_GSSAPI(_SSH_GSSAuth): """ raise NotImplementedError + +if __version_info__[0] == 2 and __version_info__[0] <= 4: + # provide the old name for strict backward compatibility + _SSH_GSSAPI = _SSH_GSSAPI_OLD + + +class _SSH_GSSAPI_NEW(_SSH_GSSAuth): + """ + Implementation of the GSS-API MIT Kerberos Authentication for SSH2, + using the newer, currently maintained gssapi package. + + :see: `.GSSAuth` + """ + + def __init__(self, auth_method, gss_deleg_creds): + """ + :param str auth_method: The name of the SSH authentication mechanism + (gssapi-with-mic or gss-keyex) + :param bool gss_deleg_creds: Delegate client credentials or not + """ + _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds) + + if self._gss_deleg_creds: + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) + else: + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + ) + + def ssh_init_sec_context( + self, target, desired_mech=None, username=None, recv_token=None + ): + """ + Initialize a GSS-API context. + + :param str username: The name of the user who attempts to login + :param str target: The hostname of the target to connect to + :param str desired_mech: The negotiated GSS-API mechanism + ("pseudo negotiated" mechanism, because we + support just the krb5 mechanism :-)) + :param str recv_token: The GSS-API token received from the Server + :raises: `.SSHException` -- Is raised if the desired mechanism of the + client is not supported + :raises: ``gssapi.exceptions.GSSError`` if there is an error signaled + by the GSS-API implementation + :return: A ``String`` if the GSS-API has returned a token or ``None`` + if no token was returned + """ + self._username = username + self._gss_host = target + targ_name = gssapi.Name( + "host@" + self._gss_host, + name_type=gssapi.NameType.hostbased_service, + ) + if desired_mech is not None: + mech, __ = decoder.decode(desired_mech) + if mech.__str__() != self._krb5_mech: + raise SSHException("Unsupported mechanism OID.") + krb5_mech = gssapi.MechType.kerberos + token = None + if recv_token is None: + self._gss_ctxt = gssapi.SecurityContext( + name=targ_name, + flags=self._gss_flags, + mech=krb5_mech, + usage="initiate", + ) + token = self._gss_ctxt.step(token) + else: + token = self._gss_ctxt.step(recv_token) + self._gss_ctxt_status = self._gss_ctxt.complete + return token + + def ssh_get_mic(self, session_id, gss_kex=False): + """ + Create the MIC token for a SSH2 message. + + :param str session_id: The SSH session ID + :param bool gss_kex: Generate the MIC for GSS-API Key Exchange or not + :return: gssapi-with-mic: + Returns the MIC token from GSS-API for the message we created + with ``_ssh_build_mic``. + gssapi-keyex: + Returns the MIC token from GSS-API with the SSH session ID as + message. + :rtype: str + """ + self._session_id = session_id + if not gss_kex: + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) + mic_token = self._gss_ctxt.get_signature(mic_field) + else: + # for key exchange with gssapi-keyex + mic_token = self._gss_srv_ctxt.get_signature(self._session_id) + return mic_token + + def ssh_accept_sec_context(self, hostname, recv_token, username=None): + """ + Accept a GSS-API context (server mode). + + :param str hostname: The servers hostname + :param str username: The name of the user who attempts to login + :param str recv_token: The GSS-API Token received from the server, + if it's not the initial call. + :return: A ``String`` if the GSS-API has returned a token or ``None`` + if no token was returned + """ + # hostname and username are not required for GSSAPI, but for SSPI + self._gss_host = hostname + self._username = username + if self._gss_srv_ctxt is None: + self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept") + token = self._gss_srv_ctxt.step(recv_token) + self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete + return token + + def ssh_check_mic(self, mic_token, session_id, username=None): + """ + Verify the MIC token for a SSH2 message. + + :param str mic_token: The MIC token received from the client + :param str session_id: The SSH session ID + :param str username: The name of the user who attempts to login + :return: None if the MIC check was successful + :raises: ``gssapi.exceptions.GSSError`` -- if the MIC check failed + """ + self._session_id = session_id + self._username = username + if self._username is not None: + # server mode + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) + self._gss_srv_ctxt.verify_signature(mic_field, mic_token) + else: + # for key exchange with gssapi-keyex + # client mode + self._gss_ctxt.verify_signature(self._session_id, mic_token) + + @property + def credentials_delegated(self): + """ + Checks if credentials are delegated (server mode). + + :return: ``True`` if credentials are delegated, otherwise ``False`` + :rtype: bool + """ + if self._gss_srv_ctxt.delegated_creds is not None: + return True + return False + + def save_client_creds(self, client_token): + """ + Save the Client token in a file. This is used by the SSH server + to store the client credentials if credentials are delegated + (server mode). + + :param str client_token: The GSS-API token received form the client + :raises: ``NotImplementedError`` -- Credential delegation is currently + not supported in server mode + """ + raise NotImplementedError + class _SSH_SSPI(_SSH_GSSAuth): """ Index: paramiko-2.5.0/tests/test_gssapi.py =================================================================== --- paramiko-2.5.0.orig/tests/test_gssapi.py +++ paramiko-2.5.0/tests/test_gssapi.py @@ -22,20 +22,20 @@ Test the used APIs for GSS-API / SSPI authentication """ -import unittest import socket -from .util import needs_gssapi - +from .util import needs_gssapi, KerberosTestCase, update_env @needs_gssapi -class GSSAPITest(unittest.TestCase): - def setup(self): +class GSSAPITest(KerberosTestCase): + def setUp(self): + super(GSSAPITest, self).setUp() # TODO: these vars should all come from os.environ or whatever the # approved pytest method is for runtime-configuring test data. self.krb5_mech = "1.2.840.113554.1.2.2" - self.targ_name = "hostname" + self.targ_name = self.realm.hostname self.server_mode = False + update_env(self, self.realm.env) def test_pyasn1(self): """ @@ -48,13 +48,16 @@ class GSSAPITest(unittest.TestCase): mech, __ = decoder.decode(oid) self.assertEquals(self.krb5_mech, mech.__str__()) - def test_gssapi_sspi(self): + def _gssapi_sspi_test(self): """ Test the used methods of python-gssapi or sspi, sspicon from pywin32. """ - _API = "MIT" try: import gssapi + if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + _API = "PYTHON-GSSAPI-OLD" + else: + _API = "PYTHON-GSSAPI-NEW" except ImportError: import sspicon import sspi @@ -65,7 +68,7 @@ class GSSAPITest(unittest.TestCase): gss_ctxt_status = False mic_msg = b"G'day Mate!" - if _API == "MIT": + if _API == "PYTHON-GSSAPI-OLD": if self.server_mode: gss_flags = ( gssapi.C_PROT_READY_FLAG, @@ -113,6 +116,56 @@ class GSSAPITest(unittest.TestCase): # Check MIC status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) + elif _API == "PYTHON-GSSAPI-NEW": + if self.server_mode: + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) + else: + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.delegate_to_peer, + ) + # Initialize a GSS-API context. + krb5_oid = gssapi.MechType.kerberos + target_name = gssapi.Name( + "host@" + self.targ_name, + name_type=gssapi.NameType.hostbased_service, + ) + gss_ctxt = gssapi.SecurityContext( + name=target_name, + flags=gss_flags, + mech=krb5_oid, + usage="initiate", + ) + if self.server_mode: + c_token = gss_ctxt.step(c_token) + gss_ctxt_status = gss_ctxt.complete + self.assertEquals(False, gss_ctxt_status) + # Accept a GSS-API context. + gss_srv_ctxt = gssapi.SecurityContext(usage="accept") + s_token = gss_srv_ctxt.step(c_token) + gss_ctxt_status = gss_srv_ctxt.complete + self.assertNotEquals(None, s_token) + self.assertEquals(True, gss_ctxt_status) + # Establish the client context + c_token = gss_ctxt.step(s_token) + self.assertEquals(None, c_token) + else: + while not gss_ctxt.complete: + c_token = gss_ctxt.step(c_token) + self.assertNotEquals(None, c_token) + # Build MIC + mic_token = gss_ctxt.get_signature(mic_msg) + + if self.server_mode: + # Check MIC + status = gss_srv_ctxt.verify_signature(mic_msg, mic_token) + self.assertEquals(0, status) else: gss_flags = ( sspicon.ISC_REQ_INTEGRITY @@ -145,3 +198,16 @@ class GSSAPITest(unittest.TestCase): error, token = gss_ctxt.authorize(c_token) c_token = token[0].Buffer self.assertNotEquals(0, error) + + def test_2_gssapi_sspi_client(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self._gssapi_sspi_test() + + def test_3_gssapi_sspi_server(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self.server_mode = True + self._gssapi_sspi_test() Index: paramiko-2.5.0/tests/test_kex_gss.py =================================================================== --- paramiko-2.5.0.orig/tests/test_kex_gss.py +++ paramiko-2.5.0/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi +from .util import needs_gssapi, KerberosTestCase, update_env class NullServer(paramiko.ServerInterface): @@ -53,27 +53,22 @@ class NullServer(paramiko.ServerInterfac return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True @needs_gssapi -class GSSKexTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - +class GSSKexTest(KerberosTestCase): def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() @@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks, gss_kex=True) host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") self.ts.add_server_key(host_key) - self.ts.set_gss_host(targ_name) + self.ts.set_gss_host(self.realm.hostname) try: self.ts.load_server_moduli() except: @@ -150,6 +145,7 @@ class GSSKexTest(unittest.TestCase): """ self._test_gsskex_and_auth(gss_host=None) + @unittest.expectedFailure # to be investigated, see https://github.com/paramiko/paramiko/issues/1312 def test_gsskex_and_auth_rekey(self): """ Verify that Paramiko can rekey. Index: paramiko-2.5.0/tests/test_ssh_gss.py =================================================================== --- paramiko-2.5.0.orig/tests/test_ssh_gss.py +++ paramiko-2.5.0/tests/test_ssh_gss.py @@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 import socket import threading -import unittest import paramiko -from .util import _support, needs_gssapi +from .util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -61,23 +60,24 @@ class NullServer(paramiko.ServerInterfac return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True @needs_gssapi -class GSSAuthTest(unittest.TestCase): +class GSSAuthTest(KerberosTestCase): def setUp(self): # TODO: username and targ_name should come from os.environ or whatever # the approved pytest method is for runtime-configuring test data. - self.username = "krb5_principal" - self.hostname = socket.getfqdn("targ_name") + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind(("targ_name", 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() Index: paramiko-2.5.0/tests/util.py =================================================================== --- paramiko-2.5.0.orig/tests/util.py +++ paramiko-2.5.0/tests/util.py @@ -1,19 +1,21 @@ from os.path import dirname, realpath, join +import os +import sys +import unittest import pytest from paramiko.py3compat import builtins +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE def _support(filename): return join(dirname(realpath(__file__)), filename) -# TODO: consider using pytest.importorskip('gssapi') instead? We presumably -# still need CLI configurability for the Kerberos parameters, though, so can't -# JUST key off presence of GSSAPI optional dependency... -# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something. -needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test") +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) def needs_builtin(name): @@ -25,3 +27,96 @@ def needs_builtin(name): slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm(object): + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm + + +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args))