OSC: Remove BGPVPN calls to neutronclient

With [1] the python binding code in Neutronclient prints warning
about future deprecation, change networking-bgpvpn osc client
code to use totally OpenstackSDK.

[1]: https://review.opendev.org/c/openstack/python-neutronclient/+/862371

Change-Id: Iae1378ffabda3d30257e35a6148e205a9f495eb5
Related-Bug: #1999774
Depends-On: https://review.opendev.org/c/openstack/openstacksdk/+/875530
This commit is contained in:
elajkat 2023-02-28 10:06:23 +01:00
parent d342171f9e
commit d4124aed1f
9 changed files with 392 additions and 359 deletions

View File

@ -25,13 +25,12 @@ from osc_lib.utils import columns as column_util
from neutronclient._i18n import _
from neutronclient.osc import utils as nc_osc_utils
from neutronclient.osc.v2.networking_bgpvpn import constants
LOG = logging.getLogger(__name__)
_attr_map = (
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY),
('project_id', 'Project', column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
('type', 'Type', column_util.LIST_BOTH),
('route_targets', 'Route Targets', column_util.LIST_LONG_ONLY),
@ -166,7 +165,7 @@ def _args2body(client_manager, id, action, args):
args.purge_export_target and args.purge_route_distinguisher) and
(args.route_targets or args.import_targets or
args.export_targets or args.route_distinguishers)):
bgpvpn = client_manager.neutronclient.show_bgpvpn(id)['bgpvpn']
bgpvpn = client_manager.network.get_bgpvpn(id)
attrs = {}
@ -221,7 +220,7 @@ def _args2body(client_manager, id, action, args):
set(bgpvpn['route_distinguishers']) -
set(args.route_distinguishers))
return {constants.BGPVPN: attrs}
return attrs
class CreateBgpvpn(command.ShowOne):
@ -241,7 +240,7 @@ class CreateBgpvpn(command.ShowOne):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
@ -266,9 +265,8 @@ class CreateBgpvpn(command.ShowOne):
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
body = {constants.BGPVPN: attrs}
obj = client.create_bgpvpn(body)[constants.BGPVPN]
columns, display_columns = column_util.get_columns(obj, _attr_map)
obj = client.create_bgpvpn(**attrs)
display_columns, columns = nc_osc_utils._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=_formatters)
return display_columns, data
@ -288,10 +286,10 @@ class SetBgpvpn(command.Command):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id']
client = self.app.client_manager.network
id = client.find_bgpvpn(parsed_args.bgpvpn)['id']
body = _args2body(self.app.client_manager, id, 'set', parsed_args)
client.update_bgpvpn(id, body)
client.update_bgpvpn(id, **body)
class UnsetBgpvpn(command.Command):
@ -308,10 +306,10 @@ class UnsetBgpvpn(command.Command):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id']
client = self.app.client_manager.network
id = client.find_bgpvpn(parsed_args.bgpvpn)['id']
body = _args2body(self.app.client_manager, id, 'unset', parsed_args)
client.update_bgpvpn(id, body)
client.update_bgpvpn(id, **body)
class DeleteBgpvpn(command.Command):
@ -328,11 +326,11 @@ class DeleteBgpvpn(command.Command):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
fails = 0
for id_or_name in parsed_args.bgpvpns:
try:
id = client.find_resource(constants.BGPVPN, id_or_name)['id']
id = client.find_bgpvpn(id_or_name)['id']
client.delete_bgpvpn(id)
LOG.warning("BGP VPN %(id)s deleted", {'id': id})
except Exception as e:
@ -368,7 +366,7 @@ class ListBgpvpn(command.Lister):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
params = {}
if parsed_args.project is not None:
project_id = nc_osc_utils.find_project(
@ -379,7 +377,7 @@ class ListBgpvpn(command.Lister):
params['tenant_id'] = project_id
if parsed_args.property:
params.update(parsed_args.property)
objs = client.list_bgpvpns(**params)[constants.BGPVPNS]
objs = client.bgpvpns(**params)
headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long)
return (headers, (osc_utils.get_dict_properties(
@ -399,10 +397,10 @@ class ShowBgpvpn(command.ShowOne):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id']
obj = client.show_bgpvpn(id)[constants.BGPVPN]
columns, display_columns = column_util.get_columns(obj, _attr_map)
client = self.app.client_manager.network
id = client.find_bgpvpn(parsed_args.bgpvpn)['id']
obj = client.get_bgpvpn(id)
display_columns, columns = nc_osc_utils._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=_formatters)
return display_columns, data

View File

@ -108,6 +108,7 @@ class BgpvpnPortAssoc(object):
LOG.warning("Unknown route type %s (%s).", route['type'],
route)
data.pop('routes', None)
return data
def _get_common_parser(self, parser):
"""Adds to parser arguments common to create, set and unset commands.
@ -201,15 +202,13 @@ class BgpvpnPortAssoc(object):
)
def _args2body(self, bgpvpn_id, args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
attrs = {}
if self._action != 'create':
assoc = client.find_resource_by_id(
self._resource,
assoc = client.find_bgpvpn_port_association(
args.resource_association_id,
cmd_resource='bgpvpn_%s_assoc' % self._assoc_res_name,
parent_id=bgpvpn_id)
bgpvpn_id=bgpvpn_id)
else:
assoc = {'routes': []}
@ -248,7 +247,7 @@ class BgpvpnPortAssoc(object):
else:
routes = args.bgpvpn_routes
args_bgpvpn_routes = {
client.find_resource(constants.BGPVPN, r['bgpvpn'])['id']:
client.find_bgpvpn(r['bgpvpn']).id:
r.get('local_pref')
for r in routes
}
@ -281,7 +280,7 @@ class BgpvpnPortAssoc(object):
route['local_pref'] = int(local_pref)
attrs.setdefault('routes', []).append(route)
return {self._resource: attrs}
return attrs
class CreateBgpvpnPortAssoc(BgpvpnPortAssoc,

View File

@ -24,7 +24,6 @@ from osc_lib.utils import columns as column_util
from neutronclient._i18n import _
from neutronclient.osc import utils as nc_osc_utils
from neutronclient.osc.v2.networking_bgpvpn import constants
LOG = logging.getLogger(__name__)
@ -56,35 +55,32 @@ class CreateBgpvpnResAssoc(command.ShowOne):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
create_method = getattr(
client, 'create_bgpvpn_%s_assoc' % self._assoc_res_name)
bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)
assoc_res = client.find_resource(self._assoc_res_name,
parsed_args.resource)
body = {
self._resource: {
'%s_id' % self._assoc_res_name: assoc_res['id'],
},
}
client, 'create_bgpvpn_%s_association' % self._assoc_res_name)
bgpvpn = client.find_bgpvpn(parsed_args.bgpvpn)
find_res_method = getattr(
client, 'find_%s' % self._assoc_res_name)
assoc_res = find_res_method(parsed_args.resource)
body = {'%s_id' % self._assoc_res_name: assoc_res['id']}
if 'project' in parsed_args and parsed_args.project is not None:
project_id = nc_osc_utils.find_project(
self.app.client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
body[self._resource]['tenant_id'] = project_id
body['tenant_id'] = project_id
arg2body = getattr(self, '_args2body', None)
if callable(arg2body):
body[self._resource].update(
arg2body(bgpvpn['id'], parsed_args)[self._resource])
body.update(
arg2body(bgpvpn['id'], parsed_args))
obj = create_method(bgpvpn['id'], body)[self._resource]
obj = create_method(bgpvpn['id'], **body)
transform = getattr(self, '_transform_resource', None)
if callable(transform):
transform(obj)
columns, display_columns = column_util.get_columns(obj, self._attr_map)
display_columns, columns = nc_osc_utils._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=self._formatters)
return display_columns, data
@ -116,15 +112,15 @@ class SetBgpvpnResAssoc(command.Command):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
update_method = getattr(
client, 'update_bgpvpn_%s_assoc' % self._assoc_res_name)
bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)
client, 'update_bgpvpn_%s_association' % self._assoc_res_name)
bgpvpn = client.find_bgpvpn(parsed_args.bgpvpn)
arg2body = getattr(self, '_args2body', None)
if callable(arg2body):
body = arg2body(bgpvpn['id'], parsed_args)
update_method(bgpvpn['id'], parsed_args.resource_association_id,
body)
**body)
class UnsetBgpvpnResAssoc(SetBgpvpnResAssoc):
@ -153,10 +149,10 @@ class DeleteBgpvpnResAssoc(command.Command):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
delete_method = getattr(
client, 'delete_bgpvpn_%s_assoc' % self._assoc_res_name)
bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)
client, 'delete_bgpvpn_%s_association' % self._assoc_res_name)
bgpvpn = client.find_bgpvpn(parsed_args.bgpvpn)
fails = 0
for id in parsed_args.resource_association_ids:
try:
@ -206,22 +202,27 @@ class ListBgpvpnResAssoc(command.Lister):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
client = self.app.client_manager.network
list_method = getattr(client,
'list_bgpvpn_%s_assocs' % self._assoc_res_name)
bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)
'bgpvpn_%s_associations' % self._assoc_res_name)
bgpvpn = client.find_bgpvpn(parsed_args.bgpvpn)
params = {}
if parsed_args.property:
params.update(parsed_args.property)
objs = list_method(bgpvpn['id'],
retrieve_all=True, **params)[self._resource_plural]
retrieve_all=True, **params)
transform = getattr(self, '_transform_resource', None)
transformed_objs = []
if callable(transform):
[transform(obj) for obj in objs]
for obj in objs:
transformed_objs.append(transform(obj))
else:
transformed_objs = list(objs)
headers, columns = column_util.get_column_definitions(
self._attr_map, long_listing=parsed_args.long)
return (headers, (osc_utils.get_dict_properties(
s, columns, formatters=self._formatters) for s in objs))
s, columns, formatters=self._formatters)
for s in transformed_objs))
class ShowBgpvpnResAssoc(command.ShowOne):
@ -243,20 +244,16 @@ class ShowBgpvpnResAssoc(command.ShowOne):
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
show_method = getattr(client,
'show_bgpvpn_%s_assoc' % self._assoc_res_name)
bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)
assoc = client.find_resource_by_id(
self._resource,
parsed_args.resource_association_id,
cmd_resource='bgpvpn_%s_assoc' % self._assoc_res_name,
parent_id=bgpvpn['id'])
obj = show_method(bgpvpn['id'], assoc['id'])[self._resource]
client = self.app.client_manager.network
show_method = getattr(
client, 'get_bgpvpn_%s_association' % self._assoc_res_name)
bgpvpn = client.find_bgpvpn(parsed_args.bgpvpn)
obj = show_method(bgpvpn['id'],
parsed_args.resource_association_id)
transform = getattr(self, '_transform_resource', None)
if callable(transform):
transform(obj)
columns, display_columns = column_util.get_columns(obj, self._attr_map)
display_columns, columns = nc_osc_utils._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=self._formatters)
return display_columns, data

View File

@ -75,14 +75,13 @@ class BgpvpnRouterAssoc(object):
)
def _args2body(self, _, args):
attrs = {}
attrs = {'advertise_extra_routes': False}
if args.advertise_extra_routes:
attrs['advertise_extra_routes'] = self._action != 'unset'
elif args.no_advertise_extra_routes:
attrs['advertise_extra_routes'] = self._action == 'unset'
return {self._resource: attrs}
return attrs
class CreateBgpvpnRouterAssoc(BgpvpnRouterAssoc, CreateBgpvpnResAssoc):

View File

@ -17,10 +17,11 @@
import copy
from unittest import mock
from openstack.network.v2 import bgpvpn as _bgpvpn
from openstack import resource as sdk_resource
from osc_lib.utils import columns as column_util
from neutronclient.osc import utils as nc_osc_utils
from neutronclient.osc.v2.networking_bgpvpn import constants
from neutronclient.osc.v2.networking_bgpvpn.resource_association import\
CreateBgpvpnResAssoc
from neutronclient.osc.v2.networking_bgpvpn.resource_association import\
@ -61,50 +62,45 @@ class TestNeutronClientBgpvpn(test_fakes.TestNeutronClientOSCV2):
side_effect=lambda _, name_or_id, __: mock.Mock(id=name_or_id))
class FakeBgpvpn(object):
"""Fake BGP VPN with attributes."""
def create_one_bgpvpn(attrs=None):
"""Create a fake BGP VPN."""
@staticmethod
def create_one_bgpvpn(attrs=None):
"""Create a fake BGP VPN."""
attrs = attrs or {}
attrs = attrs or {}
# Set default attributes.
bgpvpn_attrs = {
'id': 'fake_bgpvpn_id',
'tenant_id': _FAKE_PROJECT_ID,
'name': '',
'type': 'l3',
'route_targets': [],
'import_targets': [],
'export_targets': [],
'route_distinguishers': [],
'networks': [],
'routers': [],
'ports': [],
'vni': 100,
'local_pref': 777,
}
# Set default attributes.
bgpvpn_attrs = {
'id': 'fake_bgpvpn_id',
'tenant_id': _FAKE_PROJECT_ID,
'name': '',
'type': 'l3',
'route_targets': [],
'import_targets': [],
'export_targets': [],
'route_distinguishers': [],
'networks': [],
'routers': [],
'ports': [],
'vni': 100,
'local_pref': 777,
}
# Overwrite default attributes.
bgpvpn_attrs.update(attrs)
return _bgpvpn.BgpVpn(**bgpvpn_attrs)
# Overwrite default attributes.
bgpvpn_attrs.update(attrs)
return copy.deepcopy(bgpvpn_attrs)
def create_bgpvpns(attrs=None, count=1):
"""Create multiple fake BGP VPN."""
@staticmethod
def create_bgpvpns(attrs=None, count=1):
"""Create multiple fake BGP VPN."""
bgpvpns = []
for i in range(0, count):
if attrs is None:
attrs = {'id': 'fake_id%d' % i}
elif getattr(attrs, 'id', None) is None:
attrs['id'] = 'fake_id%d' % i
bgpvpns.append(create_one_bgpvpn(attrs))
bgpvpns = []
for i in range(0, count):
if attrs is None:
attrs = {'id': 'fake_id%d' % i}
elif getattr(attrs, 'id', None) is None:
attrs['id'] = 'fake_id%d' % i
bgpvpns.append(FakeBgpvpn.create_one_bgpvpn(attrs))
return {constants.BGPVPNS: bgpvpns}
return bgpvpns
class BgpvpnFakeAssoc(object):
@ -114,9 +110,10 @@ class BgpvpnFakeAssoc(object):
_attr_map = (
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY),
('%s_id' % _assoc_res_name, '%s ID' % _assoc_res_name.capitalize(),
column_util.LIST_BOTH),
('name', 'Name', column_util.LIST_BOTH),
('project_id', 'Project ID', column_util.LIST_BOTH),
)
_formatters = {}
@ -152,11 +149,12 @@ class BgpvpnFakeRouterAssoc(object):
_attr_map = (
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY),
('%s_id' % _assoc_res_name, '%s ID' % _assoc_res_name.capitalize(),
column_util.LIST_BOTH),
('advertise_extra_routes', 'Advertise extra routes',
column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
('project_id', 'Project ID', column_util.LIST_BOTH),
)
_formatters = {}
@ -174,71 +172,99 @@ class ShowBgpvpnFakeRouterAssoc(BgpvpnFakeRouterAssoc, ShowBgpvpnRouterAssoc):
pass
class FakeResource(object):
"""Fake resource with minimal attributes."""
class FakeResource(sdk_resource.Resource):
resource_key = 'fakeresource'
resources_key = 'fakeresources'
base_path = '/bgpvpn/fakeresources'
@staticmethod
def create_one_resource(attrs=None):
"""Create a fake resource."""
_allow_unknown_attrs_in_body = True
attrs = attrs or {}
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
# Set default attributes.
res_attrs = {
'id': 'fake_resource_id',
'tenant_id': _FAKE_PROJECT_ID,
}
# Overwrite default attributes.
res_attrs.update(attrs)
return copy.deepcopy(res_attrs)
@staticmethod
def create_resources(attrs=None, count=1):
"""Create multiple fake resources."""
resources = []
for i in range(0, count):
if attrs is None:
attrs = {'id': 'fake_id%d' % i}
elif getattr(attrs, 'id', None) is None:
attrs['id'] = 'fake_id%d' % i
resources.append(FakeResource.create_one_resource(attrs))
return {'%ss' % BgpvpnFakeAssoc._assoc_res_name: resources}
id = sdk_resource.Body('id')
tenant_id = sdk_resource.Body('tenant_id', deprecated=True)
project_id = sdk_resource.Body('project_id', alias='tenant_id')
class FakeResAssoc(object):
"""Fake resource association with minimal attributes."""
class FakeResoureAssociation(sdk_resource.Resource):
resource_key = 'fakeresourceassociation'
resources_key = 'fakeresourceassociations'
base_path = '/bgpvpn/fakeresourceassociations'
@staticmethod
def create_one_resource_association(resource, attrs=None):
"""Create a fake resource association."""
_allow_unknown_attrs_in_body = True
attrs = attrs or {}
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
id = sdk_resource.Body('id')
tenant_id = sdk_resource.Body('tenant_id', deprecated=True)
project_id = sdk_resource.Body('project_id', alias='tenant_id')
def create_one_resource(attrs=None):
"""Create a fake resource."""
attrs = attrs or {}
# Set default attributes.
res_attrs = {
'id': 'fake_resource_id',
'tenant_id': _FAKE_PROJECT_ID,
}
# Overwrite default attributes.
res_attrs.update(attrs)
return FakeResource(**res_attrs)
def create_resources(attrs=None, count=1):
"""Create multiple fake resources."""
resources = []
for i in range(0, count):
if attrs is None:
attrs = {'id': 'fake_id%d' % i}
elif getattr(attrs, 'id', None) is None:
attrs['id'] = 'fake_id%d' % i
resources.append(create_one_resource(attrs))
return resources
def create_one_resource_association(resource, attrs=None):
"""Create a fake resource association."""
attrs = attrs or {}
res_assoc_attrs = {
'id': 'fake_association_id',
'tenant_id': resource['tenant_id'],
'fake_resource_id': resource['id'],
}
# Overwrite default attributes.
res_assoc_attrs.update(attrs)
return FakeResoureAssociation(**res_assoc_attrs)
def create_resource_associations(resources):
"""Create multiple fake resource associations."""
res_assocs = []
for idx, resource in enumerate(resources):
res_assoc_attrs = {
'id': 'fake_association_id',
'id': 'fake_association_id%d' % idx,
'tenant_id': resource['tenant_id'],
'fake_resource_id': resource['id'],
}
res_assocs.append(copy.deepcopy(res_assoc_attrs))
# Overwrite default attributes.
res_assoc_attrs.update(attrs)
return copy.deepcopy(res_assoc_attrs)
@staticmethod
def create_resource_associations(resources):
"""Create multiple fake resource associations."""
res_assocs = []
for idx, resource in enumerate(
resources['%ss' % BgpvpnFakeAssoc._assoc_res_name]):
res_assoc_attrs = {
'id': 'fake_association_id%d' % idx,
'tenant_id': resource['tenant_id'],
'fake_resource_id': resource['id'],
}
res_assocs.append(copy.deepcopy(res_assoc_attrs))
return {BgpvpnFakeAssoc._resource_plural: res_assocs}
return res_assocs

View File

@ -23,7 +23,6 @@ from osc_lib import utils as osc_utils
from osc_lib.utils import columns as column_util
from neutronclient.osc.v2.networking_bgpvpn import bgpvpn
from neutronclient.osc.v2.networking_bgpvpn import constants
from neutronclient.tests.unit.osc.v2.networking_bgpvpn import fakes
@ -55,9 +54,9 @@ class TestCreateBgpvpn(fakes.TestNeutronClientBgpvpn):
self.cmd = bgpvpn.CreateBgpvpn(self.app, self.namespace)
def test_create_bgpvpn_with_no_args(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.neutronclient.create_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
fake_bgpvpn = fakes.create_one_bgpvpn()
self.networkclient.create_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
arglist = []
verifylist = [
('project', None),
@ -75,10 +74,10 @@ class TestCreateBgpvpn(fakes.TestNeutronClientBgpvpn):
cols, data = self.cmd.take_action(parsed_args)
self.neutronclient.create_bgpvpn.assert_called_once_with(
{constants.BGPVPN: {'type': 'l3'}})
self.assertEqual(sorted_headers, cols)
self.assertItemEqual(_get_data(fake_bgpvpn), data)
self.networkclient.create_bgpvpn.assert_called_once_with(
**{'type': 'l3'})
self.assertEqual(sorted(sorted_columns), sorted(cols))
def test_create_bgpvpn_with_all_args(self):
attrs = {
@ -92,9 +91,9 @@ class TestCreateBgpvpn(fakes.TestNeutronClientBgpvpn):
'export_targets': ['fake_ert1', 'fake_ert2', 'fake_ert3'],
'route_distinguishers': ['fake_rd1', 'fake_rd2', 'fake_rd3'],
}
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs)
self.neutronclient.create_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
fake_bgpvpn = fakes.create_one_bgpvpn(attrs)
self.networkclient.create_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
arglist = [
'--project', fake_bgpvpn['tenant_id'],
'--name', fake_bgpvpn['name'],
@ -126,21 +125,18 @@ class TestCreateBgpvpn(fakes.TestNeutronClientBgpvpn):
cols, data = self.cmd.take_action(parsed_args)
fake_bgpvpn_call = copy.deepcopy(fake_bgpvpn)
fake_bgpvpn_call.pop('id')
fake_bgpvpn_call.pop('networks')
fake_bgpvpn_call.pop('routers')
fake_bgpvpn_call.pop('ports')
fake_bgpvpn_call = copy.deepcopy(attrs)
self.neutronclient.create_bgpvpn.assert_called_once_with(
{constants.BGPVPN: fake_bgpvpn_call})
self.assertEqual(sorted_headers, cols)
self.assertItemEqual(_get_data(fake_bgpvpn), data)
self.networkclient.create_bgpvpn.assert_called_once_with(
**fake_bgpvpn_call)
self.assertEqual(sorted(sorted_columns), sorted(cols))
class TestSetBgpvpn(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestSetBgpvpn, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = bgpvpn.SetBgpvpn(self.app, self.namespace)
def test_set_bgpvpn(self):
@ -150,10 +146,10 @@ class TestSetBgpvpn(fakes.TestNeutronClientBgpvpn):
'export_targets': ['set_ert1', 'set_ert2', 'set_ert3'],
'route_distinguishers': ['set_rd1', 'set_rd2', 'set_rd3'],
}
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs)
self.neutronclient.show_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
self.neutronclient.update_bgpvpn = mock.Mock()
fake_bgpvpn = fakes.create_one_bgpvpn(attrs)
self.networkclient.get_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
self.networkclient.update_bgpvpn = mock.Mock()
arglist = [
fake_bgpvpn['id'],
'--name', 'set_name',
@ -190,14 +186,14 @@ class TestSetBgpvpn(fakes.TestNeutronClientBgpvpn):
'route_distinguishers': list(
set(fake_bgpvpn['route_distinguishers']) | set(['set_rd1'])),
}
self.neutronclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], {constants.BGPVPN: attrs})
self.networkclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], **attrs)
self.assertIsNone(result)
def test_set_bgpvpn_with_purge_list(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.neutronclient.show_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
fake_bgpvpn = fakes.create_one_bgpvpn()
self.networkclient.get_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
self.neutronclient.update_bgpvpn = mock.Mock()
arglist = [
fake_bgpvpn['id'],
@ -232,14 +228,16 @@ class TestSetBgpvpn(fakes.TestNeutronClientBgpvpn):
'export_targets': [],
'route_distinguishers': [],
}
self.neutronclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], {constants.BGPVPN: attrs})
self.networkclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], **attrs)
self.assertIsNone(result)
class TestUnsetBgpvpn(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestUnsetBgpvpn, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = bgpvpn.UnsetBgpvpn(self.app, self.namespace)
def test_unset_bgpvpn(self):
@ -249,10 +247,10 @@ class TestUnsetBgpvpn(fakes.TestNeutronClientBgpvpn):
'export_targets': ['unset_ert1', 'unset_ert2', 'unset_ert3'],
'route_distinguishers': ['unset_rd1', 'unset_rd2', 'unset_rd3'],
}
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs)
self.neutronclient.show_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
self.neutronclient.update_bgpvpn = mock.Mock()
fake_bgpvpn = fakes.create_one_bgpvpn(attrs)
self.networkclient.get_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
self.networkclient.update_bgpvpn = mock.Mock()
arglist = [
fake_bgpvpn['id'],
'--route-target', 'unset_rt1',
@ -286,14 +284,14 @@ class TestUnsetBgpvpn(fakes.TestNeutronClientBgpvpn):
'route_distinguishers': list(
set(fake_bgpvpn['route_distinguishers']) - set(['unset_rd1'])),
}
self.neutronclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], {constants.BGPVPN: attrs})
self.networkclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], **attrs)
self.assertIsNone(result)
def test_unset_bgpvpn_with_purge_list(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.neutronclient.show_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
fake_bgpvpn = fakes.create_one_bgpvpn()
self.networkclient.show_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
self.neutronclient.update_bgpvpn = mock.Mock()
arglist = [
fake_bgpvpn['id'],
@ -328,21 +326,21 @@ class TestUnsetBgpvpn(fakes.TestNeutronClientBgpvpn):
'export_targets': [],
'route_distinguishers': [],
}
self.neutronclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], {constants.BGPVPN: attrs})
self.networkclient.update_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'], **attrs)
self.assertIsNone(result)
class TestDeleteBgpvpn(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestDeleteBgpvpn, self).setUp()
self.neutronclient.find_resource = mock.Mock(
side_effect=lambda _, name_or_id: {'id': name_or_id})
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = bgpvpn.DeleteBgpvpn(self.app, self.namespace)
def test_delete_one_bgpvpn(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.neutronclient.delete_bgpvpn = mock.Mock()
fake_bgpvpn = fakes.create_one_bgpvpn()
self.networkclient.delete_bgpvpn = mock.Mock()
arglist = [
fake_bgpvpn['id'],
]
@ -354,15 +352,14 @@ class TestDeleteBgpvpn(fakes.TestNeutronClientBgpvpn):
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_bgpvpn.assert_called_once_with(
self.networkclient.delete_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'])
self.assertIsNone(result)
def test_delete_multi_bpgvpn(self):
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=3)
fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in
fake_bgpvpns[constants.BGPVPNS]]
self.neutronclient.delete_bgpvpn = mock.Mock()
fake_bgpvpns = fakes.create_bgpvpns(count=3)
fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in fake_bgpvpns]
self.networkclient.delete_bgpvpn = mock.Mock()
arglist = fake_bgpvpn_ids
verifylist = [
('bgpvpns', fake_bgpvpn_ids),
@ -372,20 +369,19 @@ class TestDeleteBgpvpn(fakes.TestNeutronClientBgpvpn):
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_bgpvpn.assert_has_calls(
self.networkclient.delete_bgpvpn.assert_has_calls(
[mock.call(id) for id in fake_bgpvpn_ids])
self.assertIsNone(result)
def test_delete_multi_bpgvpn_with_unknown(self):
count = 3
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count)
fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in
fake_bgpvpns[constants.BGPVPNS]]
fake_bgpvpns = fakes.create_bgpvpns(count=count)
fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in fake_bgpvpns]
def raise_unknonw_resource(resource_path, name_or_id):
if str(count - 2) in name_or_id:
raise Exception()
self.neutronclient.delete_bgpvpn = mock.Mock(
self.networkclient.delete_bgpvpn = mock.Mock(
side_effect=raise_unknonw_resource)
arglist = fake_bgpvpn_ids
verifylist = [
@ -397,7 +393,7 @@ class TestDeleteBgpvpn(fakes.TestNeutronClientBgpvpn):
self.assertRaises(exceptions.CommandError, self.cmd.take_action,
parsed_args)
self.neutronclient.delete_bgpvpn.assert_has_calls(
self.networkclient.delete_bgpvpn.assert_has_calls(
[mock.call(id) for id in fake_bgpvpn_ids])
@ -408,8 +404,8 @@ class TestListBgpvpn(fakes.TestNeutronClientBgpvpn):
def test_list_all_bgpvpn(self):
count = 3
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count)
self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns)
fake_bgpvpns = fakes.create_bgpvpns(count=count)
self.networkclient.bgpvpns = mock.Mock(return_value=fake_bgpvpns)
arglist = []
verifylist = []
@ -417,17 +413,17 @@ class TestListBgpvpn(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpns.assert_called_once()
self.networkclient.bgpvpns.assert_called_once()
self.assertEqual(headers, list(headers_short))
self.assertListItemEqual(
list(data),
[_get_data(fake_bgpvpn, columns_short) for fake_bgpvpn
in fake_bgpvpns[constants.BGPVPNS]])
in fake_bgpvpns])
def test_list_all_bgpvpn_long_mode(self):
count = 3
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count)
self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns)
fake_bgpvpns = fakes.create_bgpvpns(count=count)
self.networkclient.bgpvpns = mock.Mock(return_value=fake_bgpvpns)
arglist = [
'--long',
]
@ -439,20 +435,20 @@ class TestListBgpvpn(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpns.assert_called_once()
self.networkclient.bgpvpns.assert_called_once()
self.assertEqual(headers, list(headers_long))
self.assertListItemEqual(
list(data),
[_get_data(fake_bgpvpn, columns_long) for fake_bgpvpn
in fake_bgpvpns[constants.BGPVPNS]])
in fake_bgpvpns])
def test_list_project_bgpvpn(self):
count = 3
project_id = 'list_fake_project_id'
attrs = {'tenant_id': project_id}
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count,
attrs=attrs)
self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns)
fake_bgpvpns = fakes.create_bgpvpns(count=count,
attrs=attrs)
self.networkclient.bgpvpns = mock.Mock(return_value=fake_bgpvpns)
arglist = [
'--project', project_id,
]
@ -464,24 +460,23 @@ class TestListBgpvpn(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpns.assert_called_once_with(
self.networkclient.bgpvpns.assert_called_once_with(
tenant_id=project_id)
self.assertEqual(headers, list(headers_short))
self.assertListItemEqual(
list(data),
[_get_data(fake_bgpvpn, columns_short) for fake_bgpvpn
in fake_bgpvpns[constants.BGPVPNS]])
in fake_bgpvpns])
def test_list_bgpvpn_with_filters(self):
count = 3
name = 'fake_id0'
layer_type = 'l2'
attrs = {'type': layer_type}
fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count,
attrs=attrs)
returned_bgpvpn = fake_bgpvpns[constants.BGPVPNS][0]
self.neutronclient.list_bgpvpns = mock.Mock(
return_value={constants.BGPVPNS: [returned_bgpvpn]})
fake_bgpvpns = fakes.create_bgpvpns(count=count,
attrs=attrs)
returned_bgpvpn = fake_bgpvpns[0]
self.networkclient.bgpvpns = mock.Mock(return_value=[returned_bgpvpn])
arglist = [
'--property', 'name=%s' % name,
'--property', 'type=%s' % layer_type,
@ -494,7 +489,7 @@ class TestListBgpvpn(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpns.assert_called_once_with(
self.networkclient.bgpvpns.assert_called_once_with(
name=name,
type=layer_type)
self.assertEqual(headers, list(headers_short))
@ -506,11 +501,13 @@ class TestShowBgpvpn(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestShowBgpvpn, self).setUp()
self.cmd = bgpvpn.ShowBgpvpn(self.app, self.namespace)
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
def test_show_bgpvpn(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.neutronclient.show_bgpvpn = mock.Mock(
return_value={constants.BGPVPN: fake_bgpvpn})
fake_bgpvpn = fakes.create_one_bgpvpn()
self.networkclient.get_bgpvpn = mock.Mock(
return_value=fake_bgpvpn)
arglist = [
fake_bgpvpn['id'],
]
@ -522,7 +519,6 @@ class TestShowBgpvpn(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.show_bgpvpn.assert_called_once_with(
self.networkclient.get_bgpvpn.assert_called_once_with(
fake_bgpvpn['id'])
self.assertEqual(sorted_headers, headers)
self.assertItemEqual(_get_data(fake_bgpvpn), data)
self.assertEqual(sorted(sorted_columns), sorted(headers))

View File

@ -14,7 +14,6 @@
# under the License.
#
import copy
import operator
from unittest import mock
@ -55,15 +54,21 @@ def _get_data(attrs, columns=sorted_columns):
class TestCreateResAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestCreateResAssoc, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.networkclient.find_fake_resource = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = fakes.CreateBgpvpnFakeResAssoc(self.app, self.namespace)
def test_create_resource_association(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_one_resource()
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_one_resource()
fake_res_assoc = fakes.create_one_resource_association(
fake_res)
self.neutronclient.create_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc})
self.networkclient.create_bgpvpn_fake_resource_association = mock.Mock(
return_value=fake_res_assoc)
self.networkclient.find_bgpvpn_fake_resource_association = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
arglist = [
fake_bgpvpn['id'],
fake_res['id'],
@ -79,14 +84,16 @@ class TestCreateResAssoc(fakes.TestNeutronClientBgpvpn):
cols, data = self.cmd.take_action(parsed_args)
fake_res_assoc_call = copy.deepcopy(fake_res_assoc)
fake_res_assoc_call.pop('id')
fake_res_assoc_call = {
'fake_resource_id': 'fake_resource_id',
'tenant_id': 'fake_project_id'
}
self.neutronclient.create_bgpvpn_fake_resource_assoc.\
self.networkclient.create_bgpvpn_fake_resource_association.\
assert_called_once_with(
fake_bgpvpn['id'],
{fakes.BgpvpnFakeAssoc._resource: fake_res_assoc_call})
self.assertEqual(sorted_headers, cols)
**fake_res_assoc_call)
self.assertEqual(sorted_columns, cols)
self.assertEqual(_get_data(fake_res_assoc), data)
@ -96,11 +103,11 @@ class TestSetResAssoc(fakes.TestNeutronClientBgpvpn):
self.cmd = fakes.SetBgpvpnFakeResAssoc(self.app, self.namespace)
def test_set_resource_association(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_one_resource()
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_one_resource()
fake_res_assoc = fakes.create_one_resource_association(
fake_res)
self.neutronclient.update_bgpvpn_fake_resource_assoc = mock.Mock(
self.networkclient.update_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc})
arglist = [
fake_res_assoc['id'],
@ -115,7 +122,7 @@ class TestSetResAssoc(fakes.TestNeutronClientBgpvpn):
result = self.cmd.take_action(parsed_args)
self.neutronclient.update_bgpvpn_fake_resource_assoc.\
self.networkclient.update_bgpvpn_fake_resource_assoc.\
assert_not_called()
self.assertIsNone(result)
@ -123,14 +130,17 @@ class TestSetResAssoc(fakes.TestNeutronClientBgpvpn):
class TestDeleteResAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestDeleteResAssoc, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = fakes.DeleteBgpvpnFakeResAssoc(self.app, self.namespace)
def test_delete_one_association(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_one_resource()
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_one_resource()
fake_res_assoc = fakes.create_one_resource_association(
fake_res)
self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock()
self.networkclient.delete_bgpvpn_fake_resource_association = \
mock.Mock()
arglist = [
fake_res_assoc['id'],
fake_bgpvpn['id'],
@ -144,21 +154,21 @@ class TestDeleteResAssoc(fakes.TestNeutronClientBgpvpn):
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_bgpvpn_fake_resource_assoc.\
self.networkclient.delete_bgpvpn_fake_resource_association.\
assert_called_once_with(fake_bgpvpn['id'], fake_res_assoc['id'])
self.assertIsNone(result)
def test_delete_multi_bpgvpn(self):
count = 3
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_resources(count=count)
fake_res_assocs = fakes.FakeResAssoc.create_resource_associations(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_resources(count=count)
fake_res_assocs = fakes.create_resource_associations(
fake_res)
fake_res_assoc_ids = [
fake_res_assoc['id'] for fake_res_assoc in
fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]
fake_res_assoc['id'] for fake_res_assoc in fake_res_assocs
]
self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock()
self.networkclient.delete_bgpvpn_fake_resource_association = \
mock.Mock()
arglist = \
fake_res_assoc_ids + [
fake_bgpvpn['id']
@ -172,25 +182,26 @@ class TestDeleteResAssoc(fakes.TestNeutronClientBgpvpn):
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_bgpvpn_fake_resource_assoc.assert_has_calls(
[mock.call(fake_bgpvpn['id'], id) for id in fake_res_assoc_ids])
self.networkclient.delete_bgpvpn_fake_resource_association.\
assert_has_calls([
mock.call(
fake_bgpvpn['id'], id) for id in fake_res_assoc_ids])
self.assertIsNone(result)
def test_delete_multi_bpgvpn_with_unknown(self):
count = 3
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_resources(count=count)
fake_res_assocs = fakes.FakeResAssoc.create_resource_associations(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_resources(count=count)
fake_res_assocs = fakes.create_resource_associations(
fake_res)
fake_res_assoc_ids = [
fake_res_assoc['id'] for fake_res_assoc in
fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]
fake_res_assoc['id'] for fake_res_assoc in fake_res_assocs
]
def raise_unknonw_resource(resource_path, name_or_id):
if str(count - 2) in name_or_id:
raise Exception()
self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock(
self.networkclient.delete_bgpvpn_fake_resource_association = mock.Mock(
side_effect=raise_unknonw_resource)
arglist = \
fake_res_assoc_ids + [
@ -206,22 +217,26 @@ class TestDeleteResAssoc(fakes.TestNeutronClientBgpvpn):
self.assertRaises(exceptions.CommandError, self.cmd.take_action,
parsed_args)
self.neutronclient.delete_bgpvpn_fake_resource_assoc.assert_has_calls(
[mock.call(fake_bgpvpn['id'], id) for id in fake_res_assoc_ids])
self.networkclient.delete_bgpvpn_fake_resource_association.\
assert_has_calls([
mock.call(fake_bgpvpn['id'], id) for id in fake_res_assoc_ids]
)
class TestListResAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestListResAssoc, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = fakes.ListBgpvpnFakeResAssoc(self.app, self.namespace)
def test_list_bgpvpn_associations(self):
count = 3
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_resources(count=count)
fake_res_assocs = fakes.FakeResAssoc.create_resource_associations(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_resources(count=count)
fake_res_assocs = fakes.create_resource_associations(
fake_res)
self.neutronclient.list_bgpvpn_fake_resource_assocs = mock.Mock(
self.networkclient.bgpvpn_fake_resource_associations = mock.Mock(
return_value=fake_res_assocs)
arglist = [
fake_bgpvpn['id'],
@ -234,21 +249,21 @@ class TestListResAssoc(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpn_fake_resource_assocs.\
self.networkclient.bgpvpn_fake_resource_associations.\
assert_called_once_with(fake_bgpvpn['id'], retrieve_all=True)
self.assertEqual(headers, list(headers_short))
self.assertEqual(
list(data),
[_get_data(fake_res_assoc, columns_short) for fake_res_assoc
in fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]])
in fake_res_assocs])
def test_list_bgpvpn_associations_long_mode(self):
count = 3
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_resources(count=count)
fake_res_assocs = fakes.FakeResAssoc.create_resource_associations(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_resources(count=count)
fake_res_assocs = fakes.create_resource_associations(
fake_res)
self.neutronclient.list_bgpvpn_fake_resource_assocs = mock.Mock(
self.networkclient.bgpvpn_fake_resource_associations = mock.Mock(
return_value=fake_res_assocs)
arglist = [
'--long',
@ -263,27 +278,29 @@ class TestListResAssoc(fakes.TestNeutronClientBgpvpn):
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_bgpvpn_fake_resource_assocs.\
self.networkclient.bgpvpn_fake_resource_associations.\
assert_called_once_with(fake_bgpvpn['id'], retrieve_all=True)
self.assertEqual(headers, list(headers_long))
self.assertEqual(
list(data),
[_get_data(fake_res_assoc, columns_long) for fake_res_assoc
in fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]])
in fake_res_assocs])
class TestShowResAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestShowResAssoc, self).setUp()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.cmd = fakes.ShowBgpvpnFakeResAssoc(self.app, self.namespace)
def test_show_resource_association(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_one_resource()
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_one_resource()
fake_res_assoc = fakes.create_one_resource_association(
fake_res)
self.neutronclient.show_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc})
self.networkclient.get_bgpvpn_fake_resource_association = mock.Mock(
return_value=fake_res_assoc)
arglist = [
fake_res_assoc['id'],
fake_bgpvpn['id'],
@ -295,9 +312,9 @@ class TestShowResAssoc(fakes.TestNeutronClientBgpvpn):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.show_bgpvpn_fake_resource_assoc.\
self.networkclient.get_bgpvpn_fake_resource_association.\
assert_called_once_with(fake_bgpvpn['id'], fake_res_assoc['id'])
self.assertEqual(sorted_headers, headers)
self.assertEqual(sorted_columns, columns)
self.assertEqual(data, _get_data(fake_res_assoc))

View File

@ -14,7 +14,6 @@
# under the License.
#
import copy
import operator
from unittest import mock
@ -56,8 +55,12 @@ class TestCreateRouterAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestCreateRouterAssoc, self).setUp()
self.cmd = fakes.CreateBgpvpnFakeRouterAssoc(self.app, self.namespace)
self.fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.fake_router = fakes.FakeResource.create_one_resource()
self.fake_bgpvpn = fakes.create_one_bgpvpn()
self.fake_router = fakes.create_one_resource()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
self.networkclient.find_fake_resource = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
def _build_args(self, param=None):
arglist_base = [
@ -89,20 +92,26 @@ class TestCreateRouterAssoc(fakes.TestNeutronClientBgpvpn):
cols, data = self.cmd.take_action(parsed_args)
fake_res_assoc_call = copy.deepcopy(fake_res_assoc)
fake_res_assoc_call.pop('id')
fake_res_assoc_call = {
'fake_resource_id': 'fake_resource_id',
'tenant_id': 'fake_project_id'
}
for key, value in verifylist:
if value not in fake_res_assoc_call.values():
fake_res_assoc_call[key] = value
fake_res_assoc_call.pop('bgpvpn')
self.neutronclient.create_bgpvpn_fake_resource_assoc.\
self.networkclient.create_bgpvpn_fake_resource_association.\
assert_called_once_with(
self.fake_bgpvpn['id'],
{fakes.BgpvpnFakeRouterAssoc._resource: fake_res_assoc_call})
**fake_res_assoc_call)
return cols, data
def test_create_router_association(self):
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
def test_create_router_associationx(self):
fake_res_assoc = fakes.create_one_resource_association(
self.fake_router)
self.neutronclient.create_bgpvpn_fake_resource_assoc = mock.Mock(
self.networkclient.create_bgpvpn_fake_resource_association = mock.Mock(
return_value={
fakes.BgpvpnFakeRouterAssoc._resource: fake_res_assoc,
'advertise_extra_routes': True})
@ -116,37 +125,35 @@ class TestCreateRouterAssoc(fakes.TestNeutronClientBgpvpn):
fake_res_assoc, arglist, verifylist)
def test_create_router_association_advertise(self):
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_res_assoc = fakes.create_one_resource_association(
self.fake_router,
{'advertise_extra_routes': True})
self.neutronclient.create_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={
fakes.BgpvpnFakeRouterAssoc._resource: fake_res_assoc})
self.networkclient.create_bgpvpn_fake_resource_association = mock.Mock(
return_value=fake_res_assoc)
arglist = self._build_args('--advertise_extra_routes')
verifylist = self._build_verify_list(('advertise_extra_routes', True))
cols, data = self._exec_create_router_association(
fake_res_assoc, arglist, verifylist)
self.assertEqual(sorted_headers, cols)
self.assertEqual(sorted_columns, cols)
self.assertEqual(_get_data(fake_res_assoc), data)
def test_create_router_association_no_advertise(self):
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_res_assoc = fakes.create_one_resource_association(
self.fake_router,
{'advertise_extra_routes': False})
self.neutronclient.create_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={
fakes.BgpvpnFakeRouterAssoc._resource: fake_res_assoc})
self.networkclient.create_bgpvpn_fake_resource_association = mock.Mock(
return_value=fake_res_assoc)
arglist = self._build_args('--no-advertise_extra_routes')
verifylist = self._build_verify_list(('advertise_extra_routes', False))
cols, data = self._exec_create_router_association(
fake_res_assoc, arglist, verifylist)
self.assertEqual(sorted_headers, cols)
self.assertEqual(sorted_columns, cols)
self.assertEqual(_get_data(fake_res_assoc), data)
def test_create_router_association_advertise_fault(self):
@ -172,8 +179,10 @@ class TestSetRouterAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestSetRouterAssoc, self).setUp()
self.cmd = fakes.SetBgpvpnFakeRouterAssoc(self.app, self.namespace)
self.fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
self.fake_router = fakes.FakeResource.create_one_resource()
self.fake_bgpvpn = fakes.create_one_bgpvpn()
self.fake_router = fakes.create_one_resource()
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
def _build_args(self, fake_res_assoc, param=None):
arglist_base = [
@ -197,10 +206,11 @@ class TestSetRouterAssoc(fakes.TestNeutronClientBgpvpn):
return verifylist
def test_set_router_association_no_advertise(self):
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_res_assoc = fakes.create_one_resource_association(
self.fake_router,
{'advertise_extra_routes': True})
self.neutronclient.update_bgpvpn_fake_resource_assoc = mock.Mock()
self.networkclient.update_bgpvpn_fake_resource_association = \
mock.Mock()
arglist = self._build_args(
fake_res_assoc,
@ -213,25 +223,20 @@ class TestSetRouterAssoc(fakes.TestNeutronClientBgpvpn):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
fake_res_assoc_call = copy.deepcopy(fake_res_assoc)
fake_res_assoc_call.pop('id')
self.neutronclient.update_bgpvpn_fake_resource_assoc.\
self.networkclient.update_bgpvpn_fake_resource_association.\
assert_called_once_with(
self.fake_bgpvpn['id'],
fake_res_assoc['id'],
{
fakes.BgpvpnFakeRouterAssoc._resource: {
'advertise_extra_routes': False
}
})
**{'advertise_extra_routes': False}
)
self.assertIsNone(result)
def test_set_router_association_advertise(self):
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_res_assoc = fakes.create_one_resource_association(
self.fake_router,
{'advertise_extra_routes': False})
self.neutronclient.update_bgpvpn_fake_resource_assoc = mock.Mock()
self.networkclient.update_bgpvpn_fake_resource_association = \
mock.Mock()
arglist = self._build_args(
fake_res_assoc,
@ -244,18 +249,12 @@ class TestSetRouterAssoc(fakes.TestNeutronClientBgpvpn):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
fake_res_assoc_call = copy.deepcopy(fake_res_assoc)
fake_res_assoc_call.pop('id')
self.neutronclient.update_bgpvpn_fake_resource_assoc.\
self.networkclient.update_bgpvpn_fake_resource_association.\
assert_called_once_with(
self.fake_bgpvpn['id'],
fake_res_assoc['id'],
{
fakes.BgpvpnFakeRouterAssoc._resource: {
'advertise_extra_routes': True
}
})
**{'advertise_extra_routes': True}
)
self.assertIsNone(result)
@ -263,15 +262,17 @@ class TestShowRouterAssoc(fakes.TestNeutronClientBgpvpn):
def setUp(self):
super(TestShowRouterAssoc, self).setUp()
self.cmd = fakes.ShowBgpvpnFakeRouterAssoc(self.app, self.namespace)
self.networkclient.find_bgpvpn = mock.Mock(
side_effect=lambda name_or_id: {'id': name_or_id})
def test_show_router_association(self):
fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn()
fake_res = fakes.FakeResource.create_one_resource()
fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association(
fake_bgpvpn = fakes.create_one_bgpvpn()
fake_res = fakes.create_one_resource()
fake_res_assoc = fakes.create_one_resource_association(
fake_res,
{'advertise_extra_routes': True})
self.neutronclient.show_bgpvpn_fake_resource_assoc = mock.Mock(
return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc})
self.networkclient.get_bgpvpn_fake_resource_association = mock.Mock(
return_value=fake_res_assoc)
arglist = [
fake_res_assoc['id'],
fake_bgpvpn['id'],
@ -283,9 +284,9 @@ class TestShowRouterAssoc(fakes.TestNeutronClientBgpvpn):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
cols, data = self.cmd.take_action(parsed_args)
self.neutronclient.show_bgpvpn_fake_resource_assoc.\
self.networkclient.get_bgpvpn_fake_resource_association.\
assert_called_once_with(fake_bgpvpn['id'], fake_res_assoc['id'])
self.assertEqual(sorted_headers, headers)
self.assertEqual(sorted_columns, cols)
self.assertEqual(data, _get_data(fake_res_assoc))

View File

@ -10,7 +10,7 @@ cliff>=3.4.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
iso8601>=0.1.11 # MIT
netaddr>=0.7.18 # BSD
openstacksdk>=1.0.0 # Apache-2.0
openstacksdk>=1.0.2 # Apache-2.0
osc-lib>=1.12.0 # Apache-2.0
oslo.i18n>=3.15.3 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0