xen: mask passwords in volume connection_data dict

The connection_data dict can have credentials in it, so we need to scrub
those before putting the stringified dict into the StorageError message
and raising that up and when logging the dict.

Note that strutils.mask_password converts the dict to a string using
six.text_type so we don't have to do that conversion first.

SecurityImpact

Change-Id: Ic5f4d4c26794550a92481bf2b725ef5eafa581b2
Closes-Bug: #1516765
(cherry picked from commit 8b289237ed)
(cherry picked from commit cf197ec2d6)
This commit is contained in:
Matt Riedemann 2015-11-16 13:11:09 -08:00 committed by Tony Breeds
parent fc932f1fbc
commit ef1ccdaca9
4 changed files with 38 additions and 4 deletions

View File

@ -15,6 +15,7 @@
from eventlet import greenthread
import mock
import six
from nova import exception
from nova import test
@ -168,14 +169,26 @@ class ParseVolumeInfoTestCase(stubs.XenAPITestBaseNoDB):
'target_lun': None,
'auth_method': 'CHAP',
'auth_username': 'username',
'auth_password': 'password'}}
'auth_password': 'verybadpass'}}
def test_parse_volume_info_parsing_auth_details(self):
conn_info = self._make_connection_info()
result = volume_utils._parse_volume_info(conn_info['data'])
self.assertEqual('username', result['chapuser'])
self.assertEqual('password', result['chappassword'])
self.assertEqual('verybadpass', result['chappassword'])
def test_parse_volume_info_missing_details(self):
# Tests that a StorageError is raised if volume_id, target_host, or
# target_ign is missing from connection_data. Also ensures that the
# auth_password value is not present in the StorageError message.
for data_key_to_null in ('volume_id', 'target_portal', 'target_iqn'):
conn_info = self._make_connection_info()
conn_info['data'][data_key_to_null] = None
ex = self.assertRaises(exception.StorageError,
volume_utils._parse_volume_info,
conn_info['data'])
self.assertNotIn('verybadpass', six.text_type(ex))
def test_get_device_number_raise_exception_on_wrong_mountpoint(self):
self.assertRaises(

View File

@ -381,6 +381,22 @@ class AttachVolumeTestCase(VolumeOpsTestBase):
mock_intro.assert_called_once_with(self.session, "sr",
target_lun="lun")
@mock.patch.object(volume_utils, "introduce_vdi")
@mock.patch.object(volumeops.LOG, 'debug')
def test_connect_hypervisor_to_volume_mask_password(self, mock_debug,
mock_intro):
# Tests that the connection_data is scrubbed before logging.
data = {'auth_password': 'verybadpass'}
self.ops._connect_hypervisor_to_volume("sr", data)
self.assertTrue(mock_debug.called, 'LOG.debug was not called')
password_logged = False
for call in mock_debug.call_args_list:
# The call object is a tuple of (args, kwargs)
if 'verybadpass' in call[0]:
password_logged = True
break
self.assertFalse(password_logged, 'connection_data was not scrubbed')
@mock.patch.object(vm_utils, "is_vm_shutdown")
@mock.patch.object(vm_utils, "create_vbd")
def test_attach_volume_to_vm_plug(self, mock_vbd, mock_shutdown):

View File

@ -24,6 +24,7 @@ import string
from eventlet import greenthread
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import strutils
from nova import exception
from nova.i18n import _, _LE, _LW
@ -84,7 +85,7 @@ def _parse_volume_info(connection_data):
target_iqn is None):
raise exception.StorageError(
reason=_('Unable to obtain target information %s') %
connection_data)
strutils.mask_password(connection_data))
volume_info = {}
volume_info['id'] = volume_id
volume_info['target'] = target_host

View File

@ -19,6 +19,7 @@ Management class for Storage-related functions (attach, detach, etc).
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import strutils
from nova import exception
from nova.i18n import _LI, _LW
@ -91,7 +92,10 @@ class VolumeOps(object):
return (sr_ref, sr_uuid)
def _connect_hypervisor_to_volume(self, sr_ref, connection_data):
LOG.debug("Connect volume to hypervisor: %s", connection_data)
# connection_data can have credentials in it so make sure to scrub
# those before logging.
LOG.debug("Connect volume to hypervisor: %s",
strutils.mask_password(connection_data))
if 'vdi_uuid' in connection_data:
vdi_ref = volume_utils.introduce_vdi(
self._session, sr_ref,