Update sslutils.wrap for newer Pythons

Make use of SSLContext to configure and then wrap sockets.

For Pythons >= 3.12 ssl.wrap_socket was been remove and this
is the only approach for configuring SSL options.

This should be compatible with Python 3.6 or later.

Change-Id: Ie6632c60db2bda10ad13a2ac2931b88ad52c10f4
This commit is contained in:
James Page 2024-02-28 12:13:33 +00:00 committed by Sharpz7
parent bc38e43044
commit 5810d016e1
2 changed files with 51 additions and 43 deletions

View File

@ -77,28 +77,27 @@ def is_enabled(conf):
def wrap(conf, sock):
conf.register_opts(_options.ssl_opts, config_section)
ssl_kwargs = {
'server_side': True,
'certfile': conf.ssl.cert_file,
'keyfile': conf.ssl.key_file,
'cert_reqs': ssl.CERT_NONE,
}
ssl_version = ssl.PROTOCOL_TLS_SERVER
if conf.ssl.version:
key = conf.ssl.version.lower()
try:
ssl_version = _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(
_("Invalid SSL version : %s") % conf.ssl.version)
context = ssl.SSLContext(ssl_version)
context.load_cert_chain(conf.ssl.cert_file, conf.ssl.key_file)
if conf.ssl.ca_file:
ssl_kwargs['ca_certs'] = conf.ssl.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(conf.ssl.ca_file)
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
if conf.ssl.version:
key = conf.ssl.version.lower()
try:
ssl_kwargs['ssl_version'] = _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(
_("Invalid SSL version : %s") % conf.ssl.version)
if conf.ssl.ciphers:
context.set_ciphers(conf.ssl.ciphers)
if conf.ssl.ciphers:
ssl_kwargs['ciphers'] = conf.ssl.ciphers
# NOTE(eezhova): SSL/TLS protocol version is injected in ssl_kwargs above,
# so skipping bandit check
return ssl.wrap_socket(sock, **ssl_kwargs) # nosec
return context.wrap_socket(sock, server_side=True)

View File

@ -78,24 +78,40 @@ class SslutilsTestCase(base.ServiceBaseTestCase):
group=sslutils.config_section)
self.assertRaises(RuntimeError, sslutils.is_enabled, self.conf)
@mock.patch("ssl.wrap_socket")
@mock.patch("ssl.SSLContext")
@mock.patch("os.path.exists")
def _test_wrap(self, exists_mock, wrap_socket_mock, **kwargs):
def _test_wrap(self, exists_mock, ssl_context_mock,
ca_file=None,
ciphers=None,
ssl_version=None):
exists_mock.return_value = True
sock = mock.Mock()
context = mock.Mock()
ssl_context_mock.return_value = context
self.conf.set_default("cert_file", self.cert_file_name,
group=sslutils.config_section)
self.conf.set_default("key_file", self.key_file_name,
group=sslutils.config_section)
ssl_kwargs = {'server_side': True,
'certfile': self.conf.ssl.cert_file,
'keyfile': self.conf.ssl.key_file,
'cert_reqs': ssl.CERT_NONE,
}
if kwargs:
ssl_kwargs.update(**kwargs)
sslutils.wrap(self.conf, sock)
wrap_socket_mock.assert_called_once_with(sock, **ssl_kwargs)
ssl_version = ssl_version or ssl.PROTOCOL_TLS_SERVER
ssl_context_mock.assert_called_once_with(ssl_version)
context.load_cert_chain.assert_called_once_with(
self.conf.ssl.cert_file,
self.conf.ssl.key_file,
)
if ca_file:
self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
context.load_verify_locations.assert_called_once_with(
ca_file
)
else:
self.assertEqual(context.verify_mode, ssl.CERT_NONE)
self.assertFalse(context.check_hostname)
if ciphers:
context.set_ciphers.assert_called_once_with(
ciphers
)
context.wrap_socket.assert_called_once_with(sock, server_side=True)
def test_wrap(self):
self._test_wrap()
@ -103,10 +119,7 @@ class SslutilsTestCase(base.ServiceBaseTestCase):
def test_wrap_ca_file(self):
self.conf.set_default("ca_file", self.ca_file_name,
group=sslutils.config_section)
ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file,
'cert_reqs': ssl.CERT_REQUIRED
}
self._test_wrap(**ssl_kwargs)
self._test_wrap(ca_file=self.conf.ssl.ca_file)
def test_wrap_ciphers(self):
self.conf.set_default("ca_file", self.ca_file_name,
@ -118,17 +131,13 @@ class SslutilsTestCase(base.ServiceBaseTestCase):
)
self.conf.set_default("ciphers", ciphers,
group=sslutils.config_section)
ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file,
'cert_reqs': ssl.CERT_REQUIRED,
'ciphers': ciphers}
self._test_wrap(**ssl_kwargs)
self._test_wrap(ca_file=self.conf.ssl.ca_file,
ciphers=self.conf.ssl.ciphers)
def test_wrap_ssl_version(self):
self.conf.set_default("ca_file", self.ca_file_name,
group=sslutils.config_section)
self.conf.set_default("version", "tlsv1",
group=sslutils.config_section)
ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file,
'cert_reqs': ssl.CERT_REQUIRED,
'ssl_version': ssl.PROTOCOL_TLSv1}
self._test_wrap(**ssl_kwargs)
self._test_wrap(ca_file=self.conf.ssl.ca_file,
ssl_version=ssl.PROTOCOL_TLSv1)