Add policy show_password to mask passwords in driver_info

Ironic API already enforces admin role to run node-show. So a new
policy show_password is added to control if plain text passwords
in driver_info should be masked or not before sending back to
API calls. The default is masking password for all cases.

Change-Id: Icd3e6be049376bf7b4468f0c149a72a06643da32
Closes-Bug: #1406191
This commit is contained in:
Zhenzan Zhou 2015-01-28 13:10:02 +08:00 committed by Devananda van der Veen
parent 5a5b72718b
commit efb321c71a
10 changed files with 76 additions and 7 deletions

View File

@ -1,4 +1,5 @@
{
"admin_api": "role:admin or role:administrator",
"show_password": "!",
"default": "rule:admin_api"
}

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ast
import datetime
from oslo.config import cfg
@ -34,6 +35,7 @@ from ironic.common import states as ir_states
from ironic.common import utils
from ironic import objects
from ironic.openstack.common import log
from ironic.openstack.common import strutils
CONF = cfg.CONF
@ -512,12 +514,16 @@ class Node(base.APIBase):
setattr(self, 'chassis_uuid', kwargs.get('chassis_id', wtypes.Unset))
@staticmethod
def _convert_with_links(node, url, expand=True):
def _convert_with_links(node, url, expand=True, show_password=True):
if not expand:
except_list = ['instance_uuid', 'maintenance', 'power_state',
'provision_state', 'uuid']
node.unset_fields_except(except_list)
else:
if not show_password:
node.driver_info = ast.literal_eval(strutils.mask_password(
node.driver_info,
"******"))
node.ports = [link.Link.make_link('self', url, 'nodes',
node.uuid + "/ports"),
link.Link.make_link('bookmark', url, 'nodes',
@ -542,7 +548,8 @@ class Node(base.APIBase):
assert_juno_provision_state_name(node)
hide_driver_internal_info(node)
return cls._convert_with_links(node, pecan.request.host_url,
expand)
expand,
pecan.request.context.show_password)
@classmethod
def sample(cls, expand=True):

View File

@ -82,10 +82,12 @@ class ContextHook(hooks.PecanHook):
is_admin = (policy.enforce('admin_api', creds, creds) or
policy.enforce('admin', creds, creds))
is_public_api = state.request.environ.get('is_public_api', False)
show_password = policy.enforce('show_password', creds, creds)
state.request.context = context.RequestContext(
is_admin=is_admin,
is_public_api=is_public_api,
show_password=show_password,
**creds)

View File

@ -21,7 +21,7 @@ class RequestContext(context.RequestContext):
def __init__(self, auth_token=None, domain_id=None, domain_name=None,
user=None, tenant=None, is_admin=False, is_public_api=False,
read_only=False, show_deleted=False, request_id=None,
roles=None):
roles=None, show_password=True):
"""Stores several additional request parameters:
:param domain_id: The ID of the domain.
@ -29,12 +29,15 @@ class RequestContext(context.RequestContext):
:param is_public_api: Specifies whether the request should be processed
without authentication.
:param roles: List of user's roles if any.
:param show_password: Specifies whether passwords should be masked
before sending back to API call.
"""
self.is_public_api = is_public_api
self.domain_id = domain_id
self.domain_name = domain_name
self.roles = roles or []
self.show_password = show_password
super(RequestContext, self).__init__(auth_token=auth_token,
user=user, tenant=tenant,
@ -54,6 +57,7 @@ class RequestContext(context.RequestContext):
'domain_id': self.domain_id,
'roles': self.roles,
'domain_name': self.domain_name,
'show_password': self.show_password,
'is_public_api': self.is_public_api}
@classmethod

View File

@ -54,9 +54,11 @@ class FakeRequestState(object):
is_admin = ('admin' in creds['roles'] or
'administrator' in creds['roles'])
is_public_api = self.request.environ.get('is_public_api', False)
show_password = ('admin' in creds['tenant'])
self.request.context = context.RequestContext(
is_admin=is_admin, is_public_api=is_public_api, **creds)
is_admin=is_admin, is_public_api=is_public_api,
show_password=show_password, **creds)
def fake_headers(admin=False):
@ -85,6 +87,8 @@ def fake_headers(admin=False):
'X-Project-Name': 'admin',
'X-Role': '_member_,admin',
'X-Roles': '_member_,admin',
'X-Tenant': 'admin',
'X-Tenant-Name': 'admin',
})
else:
headers.update({
@ -183,6 +187,7 @@ class TestContextHook(base.FunctionalTest):
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=False,
show_password=False,
is_admin=False,
roles=headers['X-Roles'].split(','))
@ -199,6 +204,7 @@ class TestContextHook(base.FunctionalTest):
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=False,
show_password=True,
is_admin=True,
roles=headers['X-Roles'].split(','))
@ -216,6 +222,7 @@ class TestContextHook(base.FunctionalTest):
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=True,
show_password=True,
is_admin=True,
roles=headers['X-Roles'].split(','))
@ -226,6 +233,42 @@ class TestContextHookCompatJuno(TestContextHook):
self.policy = self.useFixture(
policy_fixture.PolicyFixture(compat='juno'))
# override two cases because Juno has no "show_password" policy
@mock.patch.object(context, 'RequestContext')
def test_context_hook_admin(self, mock_ctx):
headers = fake_headers(admin=True)
reqstate = FakeRequestState(headers=headers)
context_hook = hooks.ContextHook(None)
context_hook.before(reqstate)
mock_ctx.assert_called_with(
auth_token=headers['X-Auth-Token'],
user=headers['X-User'],
tenant=headers['X-Tenant'],
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=False,
show_password=False,
is_admin=True,
roles=headers['X-Roles'].split(','))
@mock.patch.object(context, 'RequestContext')
def test_context_hook_public_api(self, mock_ctx):
headers = fake_headers(admin=True)
env = {'is_public_api': True}
reqstate = FakeRequestState(headers=headers, environ=env)
context_hook = hooks.ContextHook(None)
context_hook.before(reqstate)
mock_ctx.assert_called_with(
auth_token=headers['X-Auth-Token'],
user=headers['X-User'],
tenant=headers['X-Tenant'],
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=True,
show_password=False,
is_admin=True,
roles=headers['X-Roles'].split(','))
class TestTrustedCallHook(base.FunctionalTest):
def test_trusted_call_hook_not_admin(self):

View File

@ -123,6 +123,8 @@ class TestListNodes(test_api_base.FunctionalTest):
self.assertEqual(node.uuid, data['uuid'])
self.assertIn('driver', data)
self.assertIn('driver_info', data)
self.assertEqual('******', data['driver_info']['fake_password'])
self.assertEqual('bar', data['driver_info']['foo'])
self.assertIn('driver_internal_info', data)
self.assertIn('extra', data)
self.assertIn('properties', data)

View File

@ -167,7 +167,7 @@ def get_test_node(**kw):
"local_gb": "10",
"memory_mb": "4096",
}
fake_info = {"foo": "bar"}
fake_info = {"foo": "bar", "fake_password": "fakepass"}
return {
'id': kw.get('id', 123),
'uuid': kw.get('uuid', '1be26c0b-03f2-4d2e-ae87-c02d7f33c123'),

View File

@ -18,7 +18,8 @@ policy_data = """
"admin_api": "role:admin or role:administrator",
"public_api": "is_public_api:True",
"trusted_call": "rule:admin_api or rule:public_api",
"default": "rule:trusted_call"
"default": "rule:trusted_call",
"show_password": "tenant:admin"
}
"""

View File

@ -63,7 +63,8 @@ class TestNodeObject(base.DbTestCase):
autospec=True) as mock_update_node:
n = objects.Node.get(self.context, uuid)
self.assertEqual({"foo": "bar"}, n.driver_internal_info)
self.assertEqual({"foo": "bar", "fake_password": "fakepass"},
n.driver_internal_info)
n.properties = {"fake": "property"}
n.driver = "fake-driver"
n.save()

View File

@ -43,6 +43,10 @@ class PolicyTestCase(base.TestCase):
for c in creds:
self.assertTrue(policy.enforce('trusted_call', c, c))
def test_show_password(self):
creds = {'roles': [u'admin'], 'tenant': 'admin'}
self.assertTrue(policy.enforce('show_password', creds, creds))
class PolicyTestCaseNegative(base.TestCase):
"""Tests whether the configuration of the policy engine is corect."""
@ -64,3 +68,7 @@ class PolicyTestCaseNegative(base.TestCase):
for c in creds:
self.assertFalse(policy.enforce('trusted_call', c, c))
def test_show_password(self):
creds = {'roles': [u'admin'], 'tenant': 'demo'}
self.assertFalse(policy.enforce('show_password', creds, creds))