Merge pull request #73 from rbcarson/windows-principal-workaround-71

Block use of `principal` argument when using kerberos-sspi. Fixes #71.
This commit is contained in:
Cory Benfield 2016-05-17 22:43:09 +01:00
commit b87f71b90e
3 changed files with 67 additions and 20 deletions

View File

@ -131,6 +131,10 @@ builds). An explicit principal can be specified with the ``principal`` arg:
>>> kerberos_auth = HTTPKerberosAuth(principal="user@REALM")
>>> r = requests.get("http://example.org", auth=kerberos_auth)
...
**Windows users:** Explicit Principal is currently not supported when using
``kerberos-sspi``. Providing a value for ``principal`` in this scenario will raise
``NotImplementedError``.
Logging
-------

View File

@ -1,7 +1,9 @@
try:
import kerberos
using_kerberos_sspi = False
except ImportError:
import kerberos_sspi as kerberos
using_kerberos_sspi = True
import re
import logging
@ -95,6 +97,7 @@ class HTTPKerberosAuth(AuthBase):
self.principal = principal
self.hostname_override = hostname_override
self.sanitize_mutual_error_response = sanitize_mutual_error_response
self._using_kerberos_sspi = using_kerberos_sspi
def generate_request_header(self, response, host, is_preemptive=False):
"""
@ -118,9 +121,16 @@ class HTTPKerberosAuth(AuthBase):
# w/ name-based HTTP hosting)
kerb_host = self.hostname_override if self.hostname_override is not None else host
kerb_spn = "{0}@{1}".format(self.service, kerb_host)
kwargs = {}
# kerberos-sspi: Never pass principal. Raise if user tries to specify one.
if not self._using_kerberos_sspi:
kwargs['principal'] = self.principal
elif self.principal:
raise NotImplementedError("Can't use 'principal' argument with kerberos-sspi.")
result, self.context[host] = kerberos.authGSSClientInit(kerb_spn,
gssflags=gssflags, principal=self.principal)
gssflags=gssflags, **kwargs)
if result < 1:
raise EnvironmentError(result, kerb_stage)

View File

@ -54,6 +54,12 @@ class KerberosTestCase(unittest.TestCase):
clientStep_error.reset_mock()
clientStep_exception.reset_mock()
clientResponse.reset_mock()
# When using kerberos-sspi, we never pass principal to authGSSClientInit().
# This affects our repeated use of assert_called_with().
self.clientInit_default_principal = {'principal': None}
if kerberos_module_name == 'kerberos_sspi':
self.clientInit_default_principal = {}
def tearDown(self):
"""Teardown."""
@ -120,7 +126,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -142,7 +148,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
self.assertFalse(clientStep_continue.called)
self.assertFalse(clientResponse.called)
@ -164,7 +170,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_error.assert_called_with("CTX", "token")
self.assertFalse(clientResponse.called)
@ -209,7 +215,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -254,7 +260,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -495,7 +501,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -545,7 +551,7 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -565,10 +571,10 @@ class KerberosTestCase(unittest.TestCase):
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
def test_delegation(self):
with patch.multiple('kerberos',
with patch.multiple(kerberos_module_name,
authGSSClientInit=clientInit_complete,
authGSSClientResponse=clientResponse,
authGSSClientStep=clientStep_continue):
@ -609,7 +615,7 @@ class KerberosTestCase(unittest.TestCase):
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG |
kerberos.GSS_C_DELEG_FLAG),
principal=None
**self.clientInit_default_principal
)
clientStep_continue.assert_called_with("CTX", "token")
clientResponse.assert_called_with("CTX")
@ -624,13 +630,18 @@ class KerberosTestCase(unittest.TestCase):
response.headers = {'www-authenticate': 'negotiate token'}
host = urlparse(response.url).hostname
auth = requests_kerberos.HTTPKerberosAuth(principal="user@REALM")
auth.generate_request_header(response, host),
clientInit_complete.assert_called_with(
"HTTP@www.example.org",
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal="user@REALM")
try:
auth.generate_request_header(response, host)
clientInit_complete.assert_called_with(
"HTTP@www.example.org",
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal="user@REALM")
except NotImplementedError:
# principal is not supported with kerberos-sspi.
if not auth._using_kerberos_sspi:
raise
def test_realm_override(self):
with patch.multiple(kerberos_module_name,
@ -642,13 +653,35 @@ class KerberosTestCase(unittest.TestCase):
response.headers = {'www-authenticate': 'negotiate token'}
host = urlparse(response.url).hostname
auth = requests_kerberos.HTTPKerberosAuth(hostname_override="otherhost.otherdomain.org")
auth.generate_request_header(response, host),
auth.generate_request_header(response, host)
clientInit_complete.assert_called_with(
"HTTP@otherhost.otherdomain.org",
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG),
principal=None)
**self.clientInit_default_principal)
def test_kerberos_sspi_reject_principal(self):
with patch.multiple(kerberos_module_name,
authGSSClientInit=clientInit_complete,
authGSSClientResponse=clientResponse,
authGSSClientStep=clientStep_continue):
response = requests.Response()
response.url = "http://www.example.org/"
host = urlparse(response.url).hostname
auth = requests_kerberos.HTTPKerberosAuth(principal="user@REALM")
auth._using_kerberos_sspi = True
self.assertRaises(NotImplementedError, auth.generate_request_header, response, host)
auth = requests_kerberos.HTTPKerberosAuth(principal=None)
auth._using_kerberos_sspi = True
auth.generate_request_header(response, host)
clientInit_complete.assert_called_with(
"HTTP@www.example.org",
gssflags=(
kerberos.GSS_C_MUTUAL_FLAG |
kerberos.GSS_C_SEQUENCE_FLAG))
if __name__ == '__main__':