Support all_tenants search_opts for neutron

Add support for the all_tenants search option when listing
security group with neutron as the backend

The neutron doesn't have `all-tenants` concept. All the security
groups will be returned if the project/tenant id is not passed.

Change-Id: I465c946a006aca21b298f8226f55725a8c36c1f8
Co-authored-by: Xurong Yang <yangxurong@huawei.com>
closes-bug: #1284202
(cherry picked from commit 5da4a31708)
This commit is contained in:
Jeffrey Zhang 2015-06-04 23:44:18 +08:00 committed by Matt Riedemann
parent 3828368ca7
commit 69a4ca0a83
2 changed files with 114 additions and 6 deletions

View File

@ -150,15 +150,28 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
search_opts=None):
"""Returns list of security group rules owned by tenant."""
neutron = neutronapi.get_client(context)
search_opts = {}
params = {}
search_opts = search_opts if search_opts else {}
if names:
search_opts['name'] = names
params['name'] = names
if ids:
search_opts['id'] = ids
if project:
search_opts['tenant_id'] = project
params['id'] = ids
# NOTE(jeffrey4l): list all the security groups when following
# conditions are met
# * names and ids don't exist.
# * it is admin context and all_tenants exist in search_opts.
# * project is not specified.
list_all_tenants = (context.is_admin
and 'all_tenants' in search_opts
and not any([names, ids]))
# NOTE(jeffrey4l): The neutron doesn't have `all-tenants` concept.
# All the security group will be returned if the project/tenant
# id is not passed.
if project and not list_all_tenants:
params['tenant_id'] = project
try:
security_groups = neutron.list_security_groups(**search_opts).get(
security_groups = neutron.list_security_groups(**params).get(
'security_groups')
except n_exc.NeutronClientException:
with excutils.save_and_reraise_exception():

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from mox3 import mox
from neutronclient.common import exceptions as n_exc
from neutronclient.v2_0 import client
@ -46,6 +47,100 @@ class TestNeutronDriver(test.NoDBTestCase):
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(self.context, project=project_id)
def test_list_with_all_tenants_and_admin_context(self):
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
search_opts = {'all_tenants': 1}
security_groups_list = {'security_groups': []}
admin_context = context.RequestContext('user1', project_id, True)
self.mox.ReplayAll()
with mock.patch.object(
self.moxed_client,
'list_security_groups',
return_value=security_groups_list) as mock_list_secgroup:
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(admin_context,
project=project_id,
search_opts=search_opts)
mock_list_secgroup.assert_called_once_with()
def test_list_without_all_tenants_and_admin_context(self):
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
security_groups_list = {'security_groups': []}
admin_context = context.RequestContext('user1', project_id, True)
self.mox.ReplayAll()
with mock.patch.object(
self.moxed_client,
'list_security_groups',
return_value=security_groups_list) as mock_list_secgroup:
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(admin_context, project=project_id)
mock_list_secgroup.assert_called_once_with(tenant_id=project_id)
def test_list_with_all_tenants_sec_name_and_admin_context(self):
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
search_opts = {'all_tenants': 1}
security_group_names = ['secgroup_ssh']
security_groups_list = {'security_groups': []}
admin_context = context.RequestContext('user1', project_id, True)
self.mox.ReplayAll()
with mock.patch.object(
self.moxed_client,
'list_security_groups',
return_value=security_groups_list) as mock_list_secgroup:
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(admin_context, project=project_id,
names=security_group_names,
search_opts=search_opts)
mock_list_secgroup.assert_called_once_with(
name=security_group_names,
tenant_id=project_id)
def test_list_with_all_tenants_sec_name_ids_and_admin_context(self):
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
search_opts = {'all_tenants': 1}
security_group_names = ['secgroup_ssh']
security_group_ids = ['id1']
security_groups_list = {'security_groups': []}
admin_context = context.RequestContext('user1', project_id, True)
self.mox.ReplayAll()
with mock.patch.object(
self.moxed_client,
'list_security_groups',
return_value=security_groups_list) as mock_list_secgroup:
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(admin_context, project=project_id,
names=security_group_names,
ids=security_group_ids,
search_opts=search_opts)
mock_list_secgroup.assert_called_once_with(
name=security_group_names,
id=security_group_ids,
tenant_id=project_id)
def test_list_with_all_tenants_not_admin(self):
search_opts = {'all_tenants': 1}
security_groups_list = {'security_groups': []}
self.mox.ReplayAll()
with mock.patch.object(
self.moxed_client,
'list_security_groups',
return_value=security_groups_list) as mock_list_secgroup:
sg_api = neutron_driver.SecurityGroupAPI()
sg_api.list(self.context, project=self.context.tenant,
search_opts=search_opts)
mock_list_secgroup.assert_called_once_with(
tenant_id=self.context.tenant)
def test_get_with_name_duplicated(self):
sg_name = 'web_server'
expected_sg_id = '85cc3048-abc3-43cc-89b3-377341426ac5'