Mask block_device_info auth_password in virt driver debug logs

The block_device_info object can have an auth_password key which is
getting logged at debug level in several virt drivers so we need to
sanitize the message getting logged.

Adds tests to ensure the logged messages are properly sanitized.

Note that bug 1321785 was opened to track the long-term design issues
with storing the password in the block_device_info dict since this can
crop up elsewhere if it's logged.  The immediate fix here is to mask
what's already exposed.

Closes-Bug: #1319943

(cherry picked from commit 5dda3a6ab2)

Conflicts:
	nova/tests/virt/libvirt/test_libvirt.py
	nova/tests/virt/vmwareapi/test_vmops.py

Change-Id: I0eae07ce3f0f39861eb97ec3dec44895386c7d04
This commit is contained in:
Matt Riedemann 2014-05-15 12:22:19 -07:00 committed by Yaguang Tang
parent 6fb765ccb5
commit 1408081296
6 changed files with 116 additions and 12 deletions

View File

@ -777,6 +777,33 @@ class LibvirtConnTestCase(test.TestCase):
self.assertEqual(['fake_registerErrorHandler',
'fake_get_host_capabilities'], calls)
def test_sanitize_log_to_xml(self):
# setup fake data
data = {'auth_password': 'scrubme'}
bdm = [{'connection_info': {'data': data}}]
bdi = {'block_device_mapping': bdm}
# Tests that the parameters to the to_xml method are sanitized for
# passwords when logged.
def fake_debug(*args, **kwargs):
if 'auth_password' in args[0]:
self.assertNotIn('scrubme', args[0])
conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
conf = mock.Mock()
with contextlib.nested(
mock.patch.object(libvirt_driver.LOG, 'debug',
side_effect=fake_debug),
mock.patch.object(conn, 'get_guest_config', return_value=conf)
) as (
debug_mock, conf_mock
):
conn.to_xml(self.context, self.test_instance, network_info={},
disk_info={}, image_meta={}, block_device_info=bdi)
# we don't care what the log message is, we just want to make sure
# our stub method is called which asserts the password is scrubbed
self.assertTrue(debug_mock.called)
def test_close_callback(self):
self.close_callback = None

View File

@ -368,3 +368,28 @@ class VMwareVMOpsTestCase(test.NoDBTestCase):
def test_get_datacenter_ref_and_name_with_no_datastore(self):
self._test_get_datacenter_ref_and_name()
def test_spawn_mask_block_device_info_password(self):
# Very simple test that just ensures block_device_info auth_password
# is masked when logged; the rest of the test just fails out early.
data = {'auth_password': 'scrubme'}
bdm = [{'connection_info': {'data': data}}]
bdi = {'block_device_mapping': bdm}
# Tests that the parameters to the to_xml method are sanitized for
# passwords when logged.
def fake_debug(*args, **kwargs):
if 'auth_password' in args[0]:
self.assertNotIn('scrubme', args[0])
with mock.patch.object(vmops.LOG, 'debug',
side_effect=fake_debug) as debug_mock:
# the invalid disk format will cause an exception
image_meta = {'disk_format': 'fake'}
self.assertRaises(exception.InvalidDiskFormat, self._vmops.spawn,
self._context, self._instance, image_meta,
injected_files=None, admin_password=None,
network_info=[], block_device_info=bdi)
# we don't care what the log message is, we just want to make sure
# our stub method is called which asserts the password is scrubbed
self.assertTrue(debug_mock.called)

View File

@ -31,7 +31,9 @@ from nova.openstack.common.gettextutils import _
from nova.openstack.common import processutils
from nova.openstack.common import timeutils
from nova.openstack.common import units
from nova.openstack.common import uuidutils
from nova import test
from nova.tests import fake_instance
from nova.tests.virt.xenapi import stubs
from nova.tests.virt.xenapi import test_xenapi
from nova import utils
@ -616,6 +618,49 @@ class CheckVDISizeTestCase(VMUtilsTestBase):
self.vdi_uuid)
class GetVdisForInstanceTestCase(VMUtilsTestBase):
"""Tests get_vdis_for_instance utility method."""
def setUp(self):
super(GetVdisForInstanceTestCase, self).setUp()
self.context = context.get_admin_context()
self.context.auth_token = 'auth_token'
self.session = FakeSession()
self.instance = fake_instance.fake_instance_obj(self.context)
self.name_label = 'name'
self.image = 'fake_image_id'
@mock.patch.object(vm_utils, 'get_vdi_uuid_for_volume',
return_value=uuidutils.generate_uuid())
def test_vdis_for_instance_bdi_password_scrubbed(self, get_uuid_mock):
# setup fake data
data = {'name_label': self.name_label,
'sr_uuid': 'fake',
'auth_password': 'scrubme'}
bdm = [{'mount_device': '/dev/vda',
'connection_info': {'data': data}}]
bdi = {'root_device_name': 'vda',
'block_device_mapping': bdm}
# Tests that the parameters to the to_xml method are sanitized for
# passwords when logged.
def fake_debug(*args, **kwargs):
if 'auth_password' in args[0]:
self.assertNotIn('scrubme', args[0])
with mock.patch.object(vm_utils.LOG, 'debug',
side_effect=fake_debug) as debug_mock:
vdis = vm_utils.get_vdis_for_instance(self.context, self.session,
self.instance,
self.name_label, self.image,
image_type=4,
block_device_info=bdi)
self.assertEqual(1, len(vdis))
get_uuid_mock.assert_called_once_with(self.session, data)
# we don't care what the log message is, we just want to make sure
# our stub method is called which asserts the password is scrubbed
self.assertTrue(debug_mock.called)
class GetInstanceForVdisForSrTestCase(VMUtilsTestBase):
def setUp(self):
super(GetInstanceForVdisForSrTestCase, self).setUp()

View File

@ -3457,15 +3457,17 @@ class LibvirtDriver(driver.ComputeDriver):
# this ahead of time so that we don't acquire it while also
# holding the logging lock.
network_info_str = str(network_info)
LOG.debug(_('Start to_xml '
'network_info=%(network_info)s '
'disk_info=%(disk_info)s '
'image_meta=%(image_meta)s rescue=%(rescue)s'
'block_device_info=%(block_device_info)s'),
{'network_info': network_info_str, 'disk_info': disk_info,
'image_meta': image_meta, 'rescue': rescue,
'block_device_info': block_device_info},
instance=instance)
msg = ('Start to_xml '
'network_info=%(network_info)s '
'disk_info=%(disk_info)s '
'image_meta=%(image_meta)s rescue=%(rescue)s '
'block_device_info=%(block_device_info)s' %
{'network_info': network_info_str, 'disk_info': disk_info,
'image_meta': image_meta, 'rescue': rescue,
'block_device_info': block_device_info})
# NOTE(mriedem): block_device_info can contain auth_password so we
# need to sanitize the password in the message.
LOG.debug(logging.mask_password(msg), instance=instance)
conf = self.get_guest_config(instance, network_info, image_meta,
disk_info, rescue, block_device_info)
xml = conf.to_xml()

View File

@ -195,8 +195,10 @@ class VMwareVMOps(object):
"""
ebs_root = False
if block_device_info:
LOG.debug(_("Block device information present: %s")
% block_device_info, instance=instance)
msg = "Block device information present: %s" % block_device_info
# NOTE(mriedem): block_device_info can contain an auth_password
# so we have to scrub the message before logging it.
LOG.debug(logging.mask_password(msg), instance=instance)
block_device_mapping = driver.block_device_info_get_mapping(
block_device_info)
if block_device_mapping:

View File

@ -577,7 +577,10 @@ def get_vdis_for_instance(context, session, instance, name_label, image,
vdis = {}
if block_device_info:
LOG.debug(_("block device info: %s"), block_device_info)
msg = "block device info: %s" % block_device_info
# NOTE(mriedem): block_device_info can contain an auth_password
# so we have to scrub the message before logging it.
LOG.debug(logging.mask_password(msg), instance=instance)
root_device_name = block_device_info['root_device_name']
for bdm in block_device_info['block_device_mapping']: