Fix for cross tenancy issue with neutron

Relaxing the cross tenancy validation for proxy group, else
for Fw/VPN chains cross tenancy issue occurs in non-admin tenant.

Change-Id: If74586788668384fce239d451e8246edce8ce36e
Closes-Bug: 1590390
This commit is contained in:
Ashutosh Mishra 2016-06-08 17:06:28 +05:30 committed by Hemanth Ravi
parent 376dca9dd2
commit 51c82af293
2 changed files with 52 additions and 0 deletions

View File

@ -13,6 +13,8 @@
import netaddr
import operator
from keystoneclient import exceptions as k_exceptions
from keystoneclient.v2_0 import client as k_client
from neutron._i18n import _LE
from neutron._i18n import _LW
from neutron.api.v2 import attributes
@ -26,6 +28,7 @@ from neutron.extensions import securitygroup as ext_sg
from oslo_config import cfg
from oslo_log import helpers as log
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy as sa
from gbpservice.common import utils
@ -441,6 +444,7 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations,
@log.log_method_call
def initialize(self):
self._cached_agent_notifier = None
self._resource_owner_tenant_id = None
def _reject_shared(self, object, type):
if object.get('shared'):
@ -457,6 +461,12 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations,
CrossTenantPolicyTargetGroupL2PolicyNotSupported())
def _reject_cross_tenant_l2p_l3p(self, context):
if context.current['tenant_id'] == self.resource_owner_tenant_id:
# Relax cross tenancy condition when current tenant id is admin.
# Relaxing when l2policy tenant id is of admin, to address the
# case for proxy group where l2policy belongs to admin tenant
# but l3policy belongs to user tenant.
return
# Can't create non shared L2p on a shared L3p
if context.current['l3_policy_id']:
l3p = context._plugin.get_l3_policy(
@ -465,6 +475,31 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations,
if l3p['tenant_id'] != context.current['tenant_id']:
raise exc.CrossTenantL2PolicyL3PolicyNotSupported()
@property
def resource_owner_tenant_id(self):
if not self._resource_owner_tenant_id:
self._resource_owner_tenant_id = (
self._get_resource_owner_tenant_id())
return self._resource_owner_tenant_id
def _get_resource_owner_tenant_id(self):
# Returns service tenant id, which specified in neutron conf
try:
user, pwd, tenant, auth_url = utils.get_keystone_creds()
keystoneclient = k_client.Client(username=user, password=pwd,
auth_url=auth_url)
tenant = keystoneclient.tenants.find(name=tenant)
return tenant.id
except k_exceptions.NotFound:
with excutils.save_and_reraise_exception(reraise=True):
LOG.error(_LE('No tenant with name %s exists.'), tenant)
except k_exceptions.NoUniqueMatch:
with excutils.save_and_reraise_exception(reraise=True):
LOG.error(_LE('Multiple tenants matches found for %s'), tenant)
except k_exceptions.AuthorizationFailure:
LOG.error(_LE("User: %(user)s dont have permissions"),
{'user': user})
def _reject_non_shared_net_on_shared_l2p(self, context):
if context.current.get('shared') and context.current['network_id']:
net = self._get_network(

View File

@ -107,6 +107,12 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
plugins = manager.NeutronManager.get_service_plugins()
self._gbp_plugin = plugins.get(pconst.GROUP_POLICY)
self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT)
self.saved_keystone_client = resource_mapping.k_client.Client
resource_mapping.k_client.Client = mock.Mock()
def tearDown(self):
resource_mapping.k_client.Client = self.saved_keystone_client
super(ResourceMappingTestCase, self).tearDown()
def get_plugin_context(self):
return self._plugin, self._context
@ -1311,6 +1317,17 @@ class TestPolicyTargetGroup(ResourceMappingTestCase):
def test_cross_tenant_prs_admin(self):
self._test_cross_tenant_prs(admin=True)
def test_cross_tenant_l2p(self):
l2p = self.create_l2_policy(name="l2p1", tenant_id='anothertenant')
l2p_id = l2p['l2_policy']['id']
data = {'policy_target_group': {'l2_policy_id': l2p_id,
'tenant_id': 'admin'}}
req = self.new_create_request('policy_target_groups', data)
data = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual('CrossTenantPolicyTargetGroupL2PolicyNotSupported',
data['NeutronError']['type'])
def test_l2p_update_rejected(self):
# Create two l2 policies.
l2p1 = self.create_l2_policy(name="l2p1")