diff --git a/gbpservice/neutron/services/grouppolicy/drivers/resource_mapping.py b/gbpservice/neutron/services/grouppolicy/drivers/resource_mapping.py index bf18be914..775d3278f 100644 --- a/gbpservice/neutron/services/grouppolicy/drivers/resource_mapping.py +++ b/gbpservice/neutron/services/grouppolicy/drivers/resource_mapping.py @@ -13,6 +13,8 @@ import netaddr import operator +from keystoneclient import exceptions as k_exceptions +from keystoneclient.v2_0 import client as k_client from neutron._i18n import _LE from neutron._i18n import _LW from neutron.api.v2 import attributes @@ -26,6 +28,7 @@ from neutron.extensions import securitygroup as ext_sg from oslo_config import cfg from oslo_log import helpers as log from oslo_log import log as logging +from oslo_utils import excutils import sqlalchemy as sa from gbpservice.common import utils @@ -441,6 +444,7 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations, @log.log_method_call def initialize(self): self._cached_agent_notifier = None + self._resource_owner_tenant_id = None def _reject_shared(self, object, type): if object.get('shared'): @@ -457,6 +461,12 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations, CrossTenantPolicyTargetGroupL2PolicyNotSupported()) def _reject_cross_tenant_l2p_l3p(self, context): + if context.current['tenant_id'] == self.resource_owner_tenant_id: + # Relax cross tenancy condition when current tenant id is admin. + # Relaxing when l2policy tenant id is of admin, to address the + # case for proxy group where l2policy belongs to admin tenant + # but l3policy belongs to user tenant. + return # Can't create non shared L2p on a shared L3p if context.current['l3_policy_id']: l3p = context._plugin.get_l3_policy( @@ -465,6 +475,31 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations, if l3p['tenant_id'] != context.current['tenant_id']: raise exc.CrossTenantL2PolicyL3PolicyNotSupported() + @property + def resource_owner_tenant_id(self): + if not self._resource_owner_tenant_id: + self._resource_owner_tenant_id = ( + self._get_resource_owner_tenant_id()) + return self._resource_owner_tenant_id + + def _get_resource_owner_tenant_id(self): + # Returns service tenant id, which specified in neutron conf + try: + user, pwd, tenant, auth_url = utils.get_keystone_creds() + keystoneclient = k_client.Client(username=user, password=pwd, + auth_url=auth_url) + tenant = keystoneclient.tenants.find(name=tenant) + return tenant.id + except k_exceptions.NotFound: + with excutils.save_and_reraise_exception(reraise=True): + LOG.error(_LE('No tenant with name %s exists.'), tenant) + except k_exceptions.NoUniqueMatch: + with excutils.save_and_reraise_exception(reraise=True): + LOG.error(_LE('Multiple tenants matches found for %s'), tenant) + except k_exceptions.AuthorizationFailure: + LOG.error(_LE("User: %(user)s dont have permissions"), + {'user': user}) + def _reject_non_shared_net_on_shared_l2p(self, context): if context.current.get('shared') and context.current['network_id']: net = self._get_network( diff --git a/gbpservice/neutron/tests/unit/services/grouppolicy/test_resource_mapping.py b/gbpservice/neutron/tests/unit/services/grouppolicy/test_resource_mapping.py index 4c15a059c..ba5f832a2 100644 --- a/gbpservice/neutron/tests/unit/services/grouppolicy/test_resource_mapping.py +++ b/gbpservice/neutron/tests/unit/services/grouppolicy/test_resource_mapping.py @@ -107,6 +107,12 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase): plugins = manager.NeutronManager.get_service_plugins() self._gbp_plugin = plugins.get(pconst.GROUP_POLICY) self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT) + self.saved_keystone_client = resource_mapping.k_client.Client + resource_mapping.k_client.Client = mock.Mock() + + def tearDown(self): + resource_mapping.k_client.Client = self.saved_keystone_client + super(ResourceMappingTestCase, self).tearDown() def get_plugin_context(self): return self._plugin, self._context @@ -1311,6 +1317,17 @@ class TestPolicyTargetGroup(ResourceMappingTestCase): def test_cross_tenant_prs_admin(self): self._test_cross_tenant_prs(admin=True) + def test_cross_tenant_l2p(self): + l2p = self.create_l2_policy(name="l2p1", tenant_id='anothertenant') + l2p_id = l2p['l2_policy']['id'] + + data = {'policy_target_group': {'l2_policy_id': l2p_id, + 'tenant_id': 'admin'}} + req = self.new_create_request('policy_target_groups', data) + data = self.deserialize(self.fmt, req.get_response(self.ext_api)) + self.assertEqual('CrossTenantPolicyTargetGroupL2PolicyNotSupported', + data['NeutronError']['type']) + def test_l2p_update_rejected(self): # Create two l2 policies. l2p1 = self.create_l2_policy(name="l2p1")