Make neutron context available in neutron-lib

Even though the context was marked private, we should
consider it public as it is widely used across the
board, and it has demonstrated itself to be one of
the most stable components ever conceived in the
Neutron maze.

NeutronLibImpact

Partially-implements: blueprint neutron-lib

Change-Id: Ibb3797c17fef5a1ead40aa2cad1269ce11c45339
This commit is contained in:
Armando Migliaccio 2016-10-17 18:05:21 -07:00
parent 0f65ee435d
commit 6d5e550446
5 changed files with 51 additions and 40 deletions

View File

@ -12,7 +12,7 @@
import sqlalchemy as sa
from neutron_lib import _context as context
from neutron_lib import context
from neutron_lib.db import model_base
from neutron_lib.tests.unit.db import _base as db_base

View File

@ -14,7 +14,7 @@ import mock
from oslo_context import context as oslo_context
from testtools import matchers
from neutron_lib import _context
from neutron_lib import context
from neutron_lib.tests import _base
@ -27,7 +27,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.db_api_session = self._db_api_session_patcher.start()
def test_neutron_context_create(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
self.assertEqual('user_id', ctx.user_id)
self.assertEqual('tenant_id', ctx.project_id)
self.assertEqual('tenant_id', ctx.tenant_id)
@ -42,7 +42,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertIsNone(ctx.auth_token)
def test_neutron_context_getter_setter(self):
ctx = _context.Context('Anakin', 'Skywalker')
ctx = context.Context('Anakin', 'Skywalker')
self.assertEqual('Anakin', ctx.user_id)
self.assertEqual('Skywalker', ctx.tenant_id)
ctx.user_id = 'Darth'
@ -51,9 +51,8 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertEqual('Vader', ctx.tenant_id)
def test_neutron_context_create_with_name(self):
ctx = _context.Context('user_id', 'tenant_id',
tenant_name='tenant_name',
user_name='user_name')
ctx = context.Context('user_id', 'tenant_id',
tenant_name='tenant_name', user_name='user_name')
# Check name is set
self.assertEqual('user_name', ctx.user_name)
self.assertEqual('tenant_name', ctx.tenant_name)
@ -62,32 +61,32 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertEqual('tenant_id', ctx.tenant)
def test_neutron_context_create_with_request_id(self):
ctx = _context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
self.assertEqual('req_id_xxx', ctx.request_id)
def test_neutron_context_create_with_timestamp(self):
now = "Right Now!"
ctx = _context.Context('user_id', 'tenant_id', timestamp=now)
ctx = context.Context('user_id', 'tenant_id', timestamp=now)
self.assertEqual(now, ctx.timestamp)
def test_neutron_context_create_is_advsvc(self):
ctx = _context.Context('user_id', 'tenant_id', is_advsvc=True)
ctx = context.Context('user_id', 'tenant_id', is_advsvc=True)
self.assertFalse(ctx.is_admin)
self.assertTrue(ctx.is_advsvc)
def test_neutron_context_create_with_auth_token(self):
ctx = _context.Context('user_id', 'tenant_id',
auth_token='auth_token_xxx')
ctx = context.Context('user_id', 'tenant_id',
auth_token='auth_token_xxx')
self.assertEqual('auth_token_xxx', ctx.auth_token)
def test_neutron_context_from_dict(self):
owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'}
ctx = _context.Context.from_dict(owner)
ctx = context.Context.from_dict(owner)
self.assertEqual(owner['user_id'], ctx.user_id)
self.assertEqual(owner['tenant_id'], ctx.tenant_id)
def test_neutron_context_to_dict(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
ctx_dict = ctx.to_dict()
self.assertEqual('user_id', ctx_dict['user_id'])
self.assertEqual('tenant_id', ctx_dict['project_id'])
@ -100,23 +99,23 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertIsNone(ctx_dict['auth_token'])
def test_neutron_context_to_dict_with_name(self):
ctx = _context.Context('user_id', 'tenant_id',
tenant_name='tenant_name',
user_name='user_name')
ctx = context.Context('user_id', 'tenant_id',
tenant_name='tenant_name',
user_name='user_name')
ctx_dict = ctx.to_dict()
self.assertEqual('user_name', ctx_dict['user_name'])
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
self.assertEqual('tenant_name', ctx_dict['project_name'])
def test_neutron_context_to_dict_with_auth_token(self):
ctx = _context.Context('user_id', 'tenant_id',
auth_token='auth_token_xxx')
ctx = context.Context('user_id', 'tenant_id',
auth_token='auth_token_xxx')
ctx_dict = ctx.to_dict()
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
def test_neutron_context_admin_to_dict(self):
self.db_api_session.return_value = 'fakesession'
ctx = _context.get_admin_context()
ctx = context.get_admin_context()
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_id'])
self.assertIsNone(ctx_dict['tenant_id'])
@ -126,7 +125,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertNotIn('session', ctx_dict)
def test_neutron_context_admin_without_session_to_dict(self):
ctx = _context.get_admin_context_without_session()
ctx = context.get_admin_context_without_session()
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_id'])
self.assertIsNone(ctx_dict['tenant_id'])
@ -134,7 +133,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertFalse(hasattr(ctx, 'session'))
def test_neutron_context_elevated_retains_request_id(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
self.assertFalse(ctx.is_admin)
req_id_before = ctx.request_id
@ -143,7 +142,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertEqual(req_id_before, elevated_ctx.request_id)
def test_neutron_context_elevated_idempotent(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
self.assertFalse(ctx.is_admin)
elevated_ctx = ctx.elevated()
self.assertTrue(elevated_ctx.is_admin)
@ -151,28 +150,28 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertTrue(elevated2_ctx.is_admin)
def test_neutron_context_overwrite(self):
ctx1 = _context.Context('user_id', 'tenant_id')
ctx1 = context.Context('user_id', 'tenant_id')
self.assertEqual(ctx1.request_id,
oslo_context.get_current().request_id)
# If overwrite is not specified, request_id should be updated.
ctx2 = _context.Context('user_id', 'tenant_id')
ctx2 = context.Context('user_id', 'tenant_id')
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
self.assertEqual(ctx2.request_id,
oslo_context.get_current().request_id)
# If overwrite is specified, request_id should be kept.
ctx3 = _context.Context('user_id', 'tenant_id', overwrite=False)
ctx3 = context.Context('user_id', 'tenant_id', overwrite=False)
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
self.assertEqual(ctx2.request_id,
oslo_context.get_current().request_id)
def test_neutron_context_get_admin_context_not_update_local_store(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
req_id_before = oslo_context.get_current().request_id
self.assertEqual(ctx.request_id, req_id_before)
ctx_admin = _context.get_admin_context()
ctx_admin = context.get_admin_context()
self.assertEqual(req_id_before, oslo_context.get_current().request_id)
self.assertNotEqual(req_id_before, ctx_admin.request_id)
@ -194,21 +193,21 @@ class TestNeutronContext(_base.BaseTestCase):
'project_id': 'tenant_id',
'project_name': 'tenant_name',
}
ctx = _context.Context(**values)
ctx = context.Context(**values)
# apply dict() to get a real dictionary, needed for newer oslo.context
# that returns _DeprecatedPolicyValues object instead
policy_values = dict(ctx.to_policy_values())
self.assertDictSupersetOf(values, policy_values)
self.assertDictSupersetOf(additional_values, policy_values)
@mock.patch.object(_context.ContextBaseWithSession, 'session')
@mock.patch.object(context.ContextBaseWithSession, 'session')
def test_superclass_session(self, mocked_session):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
# make sure context uses parent class session that is mocked
self.assertEqual(mocked_session, ctx.session)
def test_session_cached(self):
ctx = _context.Context('user_id', 'tenant_id')
ctx = context.Context('user_id', 'tenant_id')
session1 = ctx.session
session2 = ctx.session
self.assertIs(session1, session2)

View File

@ -13,7 +13,7 @@
import mock
from neutron_lib import _context
from neutron_lib import context
from neutron_lib import policy
from neutron_lib.tests import _base as base
@ -35,34 +35,34 @@ class TestPolicyEnforcer(base.BaseTestCase):
self.assertIsNotNone(policy._ENFORCER)
def test_check_user_is_not_admin(self):
ctx = _context.Context('me', 'my_project')
ctx = context.Context('me', 'my_project')
self.assertFalse(policy.check_is_admin(ctx))
def test_check_user_elevated_is_admin(self):
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
ctx = context.Context('me', 'my_project', roles=['user']).elevated()
self.assertTrue(policy.check_is_admin(ctx))
def test_check_is_admin_no_roles_no_admin(self):
policy.init(policy_file='no_policy.json')
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
ctx = context.Context('me', 'my_project', roles=['user']).elevated()
# With no admin role, elevated() should not work.
self.assertFalse(policy.check_is_admin(ctx))
def test_check_is_advsvc_role(self):
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
ctx = context.Context('me', 'my_project', roles=['advsvc'])
self.assertTrue(policy.check_is_advsvc(ctx))
def test_check_is_not_advsvc_user(self):
ctx = _context.Context('me', 'my_project', roles=['user'])
ctx = context.Context('me', 'my_project', roles=['user'])
self.assertFalse(policy.check_is_advsvc(ctx))
def test_check_is_not_advsvc_admin(self):
ctx = _context.Context('me', 'my_project').elevated()
ctx = context.Context('me', 'my_project').elevated()
self.assertTrue(policy.check_is_admin(ctx))
self.assertFalse(policy.check_is_advsvc(ctx))
def test_check_is_advsvc_no_roles_no_advsvc(self):
policy.init(policy_file='no_policy.json')
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
ctx = context.Context('me', 'my_project', roles=['advsvc'])
# No advsvc role in the policy file, so cannot assume the role.
self.assertFalse(policy.check_is_advsvc(ctx))

View File

@ -0,0 +1,12 @@
---
features:
- |
The ``context`` module has been made public. For example:
.. code-block:: python
from neutron_lib import context
ctx = context.get_admin_context()
For more examples, see: https://review.openstack.org/#/c/388157/