diff --git a/keystone/common/ldap/core.py b/keystone/common/ldap/core.py index 26338847d3..f238662f19 100644 --- a/keystone/common/ldap/core.py +++ b/keystone/common/ldap/core.py @@ -582,7 +582,8 @@ def _common_ldap_initialization(url, use_tls=False, tls_cacertfile=None, if use_tls and using_ldaps: raise AssertionError(_('Invalid TLS / LDAPS combination')) - if use_tls: + # The certificate trust options apply for both LDAPS and TLS. + if use_tls or using_ldaps: if not ldap.TLS_AVAIL: raise ValueError(_('Invalid LDAP TLS_AVAIL option: %s. TLS ' 'not available') % ldap.TLS_AVAIL) diff --git a/keystone/tests/unit/common/test_ldap.py b/keystone/tests/unit/common/test_ldap.py index 759460cf2c..8a50be050c 100644 --- a/keystone/tests/unit/common/test_ldap.py +++ b/keystone/tests/unit/common/test_ldap.py @@ -10,15 +10,23 @@ # License for the specific language governing permissions and limitations # under the License. -import ldap.dn +import ldap +import mock from testtools import matchers +import os +import shutil +import tempfile + from keystone.common import ldap as ks_ldap from keystone.common.ldap import core as common_ldap_core +from keystone import config from keystone import tests from keystone.tests import default_fixtures from keystone.tests import fakeldap +CONF = config.CONF + class DnCompareTest(tests.BaseTestCase): """Tests for the DN comparison functions in keystone.common.ldap.core.""" @@ -260,3 +268,72 @@ class LDAPDeleteTreeTest(tests.TestCase): conn.search_s, child_dn, ldap.SCOPE_BASE) self.assertRaises(ldap.NO_SUCH_OBJECT, conn.search_s, grandchild_dn, ldap.SCOPE_BASE) + + +class SslTlsTest(tests.TestCase): + """Tests for the SSL/TLS functionality in keystone.common.ldap.core.""" + + @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s') + @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s') + def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two): + # Attempt to connect to initialize python-ldap. + base_ldap = ks_ldap.BaseLdap(config) + base_ldap.get_connection() + + def test_certfile_trust_tls(self): + # We need this to actually exist, so we create a tempfile. + (handle, certfile) = tempfile.mkstemp() + self.addCleanup(os.unlink, certfile) + self.addCleanup(os.close, handle) + self.config_fixture.config(group='ldap', + url='ldap://localhost', + use_tls=True, + tls_cacertfile=certfile) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) + + def test_certdir_trust_tls(self): + # We need this to actually exist, so we create a tempdir. + certdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, certdir) + self.config_fixture.config(group='ldap', + url='ldap://localhost', + use_tls=True, + tls_cacertdir=certdir) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)) + + def test_certfile_trust_ldaps(self): + # We need this to actually exist, so we create a tempfile. + (handle, certfile) = tempfile.mkstemp() + self.addCleanup(os.unlink, certfile) + self.addCleanup(os.close, handle) + self.config_fixture.config(group='ldap', + url='ldaps://localhost', + use_tls=False, + tls_cacertfile=certfile) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) + + def test_certdir_trust_ldaps(self): + # We need this to actually exist, so we create a tempdir. + certdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, certdir) + self.config_fixture.config(group='ldap', + url='ldaps://localhost', + use_tls=False, + tls_cacertdir=certdir) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))