Add unit tests

Add unit tests in order to increase the coverage rate.

Change-Id: I305fc3e2d79aee951e4a36a710383b2646665efa
This commit is contained in:
Shinji YANAGIDA 2016-04-12 09:37:48 +09:00
parent 1e8bf7394e
commit 695d33bd26
10 changed files with 456 additions and 1 deletions

View File

@ -13,6 +13,7 @@
# under the License.
from mock import patch
from oslo_config import cfg
from networking_nec.nwa.agent import nwa_agent
from networking_nec.tests.unit.nwa.agent import base
@ -34,6 +35,23 @@ class TestNECNWANeutronAgentAsNwaClient(base.TestNWAAgentBase):
self.assertIsNotNone(self.agent.callback_nwa)
self.assertIsNotNone(self.agent.callback_proxy)
@patch('neutron.common.rpc.Connection')
@patch('neutron.agent.rpc.PluginReportStateAPI')
@patch('networking_nec.nwa.l2.rpc.tenant_binding_api.'
'TenantBindingServerRpcApi')
def test__setup_rpc_no_report_interval(self, f1, f2, f3):
self.agent.conf.NWA.lbaas_driver = True
self.agent.conf.NWA.fwaas_driver = True
cfg.CONF.AGENT.report_interval = 0
self.agent.setup_rpc()
self.assertIsNotNone(self.agent.host)
self.assertIsNotNone(self.agent.agent_id)
self.assertIsNotNone(self.agent.context)
self.assertIsNotNone(self.agent.nwa_l2_rpc)
self.assertIsNotNone(self.agent.state_rpc)
self.assertIsNotNone(self.agent.callback_nwa)
self.assertIsNotNone(self.agent.callback_proxy)
def test__report_state(self):
self.assertIsNone(self.agent._report_state())

View File

@ -56,6 +56,25 @@ class TestAgentProxyL2(base.TestNWAAgentBase):
ret_val = e.value
self.assertEqual(ret_val, nwa_data1)
def test__create_tenant_nw_with_key(self):
tenant_id = '844eb55f21e84a289e9c22098d387e5d'
nwa_tenant_id = 'DC1_' + tenant_id
resource_group_name = 'OpenStack/DC1/APP'
nwa_data1 = {proxy_l2.KEY_CREATE_TENANT_NW: True}
nwa_info = {
'resource_group_name': resource_group_name,
'resource_group_name_nw': resource_group_name,
}
result = self.agent.proxy_l2._create_tenant_nw(
mock.sentinel.context,
tenant_id=tenant_id,
nwa_tenant_id=nwa_tenant_id,
resource_group_name=resource_group_name,
nwa_data=nwa_data1,
nwa_info=nwa_info,
)
self.assertIsNone(result)
def test__create_vlan_succeed1(self):
tenant_id = '844eb55f21e84a289e9c22098d387e5d'
nwa_tenant_id = 'DC1_' + '844eb55f21e84a289e9c22098d387e5d'
@ -75,6 +94,27 @@ class TestAgentProxyL2(base.TestNWAAgentBase):
exp_data = load_data_file('expected_proxy_create_vlan_succeed1.json')
self.assertDictEqual(exp_data, result)
def test__create_vlan_succeed2(self):
net_id = '546a8551-5c2b-4050-a769-cc3c962fc5cf'
vlan_id_key = 'VLAN_' + net_id
tenant_id = '844eb55f21e84a289e9c22098d387e5d'
nwa_tenant_id = 'DC1_' + '844eb55f21e84a289e9c22098d387e5d'
# resource_group_name = 'OpenStack/DC1/APP'
nwa_data = {vlan_id_key: 'net-uuid-1'}
nwa_info = load_data_file('add_router_interface_nwa_info.json')
ret_vln = load_data_file('create_vlan_result.json')
ret_vln['resultdata']['VlanID'] = '300'
ret_vln['resultdata'][net_id] = 'net-uuid-1'
self.nwacli.l2.create_vlan.return_value = (200, ret_vln)
result = self.agent.proxy_l2._create_vlan(
mock.sentinel.context,
tenant_id=tenant_id,
nwa_tenant_id=nwa_tenant_id,
nwa_info=nwa_info,
nwa_data=nwa_data
)
self.assertDictEqual(nwa_data, result)
def test__create_vlan_fail1(self):
tenant_id = '844eb55f21e84a289e9c22098d387e5d'
nwa_tenant_id = 'DC1_' + '844eb55f21e84a289e9c22098d387e5d'

View File

@ -62,6 +62,12 @@ class TestCommonUtils(base.BaseTestCase):
# json_file has priority. json_str passed here is not evaluated.
self._test_load_json_from_file_from_file(json_str='invalid')
def test_load_json_from_file_no_json_file_abspath(self):
json_file = 'test.json'
self.assertRaises(cfg.Error,
nwa_com_utils.load_json_from_file,
'test', json_file, None, [])
def test_load_json_from_file_with_invalid_json_file(self):
json_file = self.get_temp_file_path('test.json')
with open(json_file, 'w') as f:

View File

@ -15,6 +15,7 @@
from mock import MagicMock
from mock import patch
from neutron.common import constants as neutron_const
from neutron import context
from neutron.extensions import providernet as prov_net
from neutron.tests.unit import testlib_api
@ -23,6 +24,7 @@ from oslo_config import cfg
from oslo_serialization import jsonutils
from networking_nec.nwa.common import constants as nwa_const
from networking_nec.nwa.common import exceptions as nwa_exc
from networking_nec.nwa.l2.drivers import mech_necnwa as mech
@ -195,6 +197,17 @@ class TestNECNWAMechanismDriver(TestMechNwa):
self.context._port['device_owner'] = 'compute:DC01_KVM01_ZONE01'
self.driver.create_port_precommit(self.context)
def test_create_port_precommit_group_not_found(self):
self.driver.resource_groups = [
{
"physical_network": "Common/App/Pod3",
"device_owner": constants.DEVICE_OWNER_ROUTER_GW,
"ResourceGroupName": "Common/App/Pod3"
}
]
self.assertRaises(nwa_exc.ResourceGroupNameNotFound,
self.driver.create_port_precommit, self.context)
@patch('networking_nec.nwa.l2.db_api.add_nwa_tenant_binding')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_create_port_precommit_owner_router_intf(self, gntb, antb):
@ -237,6 +250,30 @@ class TestNECNWAMechanismDriver(TestMechNwa):
self.context._port['device_owner'] = device_owner
self.driver.update_port_precommit(self.context)
@patch('networking_nec.nwa.l2.utils.portcontext_to_nwa_info')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_update_port_precommit_current_none(self, gntb, ptni):
self.context.current = self.context._port
self.context.current['device_id'] = None
self.context.current['device_owner'] = None
self.context.original[
'device_owner'] = constants.DEVICE_OWNER_ROUTER_INTF
self.context.original['device_id'] = 'uuid-device_id_000'
gntb.return_value = self._get_nwa_tenant_binding({
'CreateTenant': True,
'NWA_tenant_id': 'RegionOnetenant201',
'DEV_uuid-device_id_100_61': constants.DEVICE_OWNER_ROUTER_INTF,
'DEV_uuid-device_id_100_61_TYPE': nwa_const.NWA_DEVICE_TFW,
'DEV_uuid-device_id_100_61_ip_address': '192.168.120.1',
'DEV_uuid-device_id_100_61_mac_address': '12:34:56:78:9a:bc'}
)
ptni.return_value = {
'tenant_id': 'Tenant1',
'nwa_tenant_id': 'RegionOnetenant201',
'resource_group_name': 'Common/App/Pod3',
'physical_network': 'Common/App/Pod3'}
self.driver.update_port_precommit(self.context)
@patch('networking_nec.nwa.l2.db_api.set_nwa_tenant_binding')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_delete_port_precommit_owner_router_interface(self, gntb, sntb):
@ -353,6 +390,72 @@ class TestNECNWAMechanismDriver(TestMechNwa):
)
self.driver.delete_port_precommit(self.context)
@patch('networking_nec.nwa.l2.db_api.set_nwa_tenant_binding')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_delete_port_precommit_owner_floatingip(self, gntb, sntb):
floatingip = constants.DEVICE_OWNER_FLOATINGIP
self.context._port['device_owner'] = floatingip
gntb.return_value = self._get_nwa_tenant_binding({
'CreateTenant': 1,
'CreateTenantNW': True,
'NWA_tenant_id': 'RegionOnetenant201',
'DEV_uuid-device_id_100': 'device_id',
'DEV_uuid-device_id_100_device_owner': floatingip,
'DEV_uuid-device_id_100_61_TYPE': nwa_const.NWA_DEVICE_TFW,
'DEV_uuid-device_id_100_61': 'public001',
'DEV_uuid-device_id_100_61_ip_address': '172.16.1.23',
'DEV_uuid-device_id_100_63_mac_address': '12:34:56:78:9a:bc'}
)
self.driver.delete_port_precommit(self.context)
@patch('networking_nec.nwa.l2.db_api.set_nwa_tenant_binding')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_delete_port_precommit_owner_none(self, gntb, sntb):
self.context._port['device_owner'] = ''
self.context._port['device_id'] = ''
gntb.return_value = self._get_nwa_tenant_binding({
'CreateTenant': 1,
'CreateTenantNW': True,
'NWA_tenant_id': 'RegionOnetenant201',
'DEV_uuid-device_id_100': 'device_id',
'DEV_uuid-device_id_100_device_owner': '',
'DEV_uuid-device_id_100_61_TYPE': nwa_const.NWA_DEVICE_TFW,
'DEV_uuid-device_id_100_61': 'public001',
'DEV_uuid-device_id_100_61_ip_address': '172.16.1.23',
'DEV_uuid-device_id_100_63_mac_address': '12:34:56:78:9a:bc'}
)
self.driver.delete_port_precommit(self.context)
@patch('networking_nec.nwa.l2.utils.portcontext_to_nwa_info')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_delete_port_precommit_owner_dhcp(self, gntb, ptni):
self.context._port['device_owner'] = constants.DEVICE_OWNER_DHCP
self.context._port[
'device_id'] = neutron_const.DEVICE_ID_RESERVED_DHCP_PORT
self.context._port['binding:host_id'] = 'myhost'
# _revert_dhcp_agent_device
gntb.return_value = self._get_nwa_tenant_binding({
'CreateTenant': 1,
'CreateTenantNW': True,
'NWA_tenant_id': 'RegionOnetenant201',
'DEV_uuid-device_id_100': 'device_id',
'DEV_uuid-device_id_100_device_owner': constants.DEVICE_OWNER_DHCP,
'DEV_uuid-device_id_100_61_TYPE': nwa_const.NWA_DEVICE_TFW,
'DEV_uuid-device_id_100_61': 'public001',
'DEV_uuid-device_id_100_61_ip_address': '172.16.1.23',
'DEV_uuid-device_id_100_63_mac_address': '12:34:56:78:9a:bc'}
)
ptni.return_value = {
'tenant_id': 'Tenant1',
'nwa_tenant_id': 'RegionOnetenant201',
'device': {
'owner': constants.DEVICE_OWNER_DHCP,
'id': 'device_id'},
'resource_group_name': 'Common/App/Pod3',
'physical_network': 'Common/App/Pod3'}
self.driver.delete_port_precommit(self.context)
@patch('networking_nec.nwa.l2.db_api.set_nwa_tenant_binding')
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_delete_port_precommit_owner_compute_az(self, gntb, sntb):
@ -402,11 +505,36 @@ class TestNECNWAMechanismDriver(TestMechNwa):
self.context, self.network_segments[1], self.host_agents[0])
self.assertEqual(rb, True)
# in physical_network
self.context.network.current[
'provider:physical_network'] = 'Common/App/Pod3'
self.context.network.current['provider:segmentation_id'] = 199
self.context.current = self.context._port
rb = self.driver.try_to_bind_segment_for_agent(
self.context, self.network_segments[1], self.host_agents[0])
self.assertEqual(rb, True)
# not in segment
rb = self.driver.try_to_bind_segment_for_agent(
self.context, self.network_segments[1], self.host_agents[1])
self.assertEqual(rb, False)
# device_owner is router_gw
self.context._port['device_owner'] = constants.DEVICE_OWNER_ROUTER_GW
rb = self.driver.try_to_bind_segment_for_agent(
self.context, self.network_segments[1], self.host_agents[0])
self.assertEqual(rb, False)
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def test_try_to_bind_segment_for_agent_in_segments(self, gntb):
# in segment
self.context._port['device_owner'] = 'network:dhcp'
self.context.network.current['segments'] = self.network_segments
self.context.current = self.context._port
rb = self.driver.try_to_bind_segment_for_agent(
self.context, self.network_segments[2], self.host_agents[0])
self.assertEqual(rb, True)
def test__bind_segment_to_vif_type(self):
pod3_eth1 = self.host_agents[0]
rb = self.driver._bind_segment_to_vif_type(self.context, pod3_eth1)
@ -421,6 +549,12 @@ class TestNECNWAMechanismDriver(TestMechNwa):
rb = self.driver._bind_segment_to_vif_type(self.context)
self.assertEqual(rb, True)
@patch('neutron.plugins.ml2.db.get_dynamic_segment')
def test__bind_segment_to_vif_type_dummy_segment_none(self, gds):
gds.return_value = None
rb = self.driver._bind_segment_to_vif_type(self.context)
self.assertEqual(rb, True)
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_binding')
def _test__bind_port_nwa(self, gntb):
# if prov_net.PHYSICAL_NETWORK in self.context.network.current:

View File

@ -41,6 +41,7 @@ class TestNECNWAServerRpcCallbacks(base.BaseTestCase):
segment = {'segmentation_id': 1000,
portbindings.VIF_TYPE: 'vlan'}
vif_type = None
segment_id = 'seg-id-1000'
class PortContext(object):
bound_segment = None
@ -113,6 +114,49 @@ class TestNECNWAServerRpcCallbacks(base.BaseTestCase):
"sample"})
self.assertTrue(device)
@mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.update_port_status')
@mock.patch('neutron.plugins.ml2.db.get_network_segments')
@mock.patch('neutron.db.api.get_session')
@mock.patch('networking_nec.nwa.l2.plugin.'
'NECNWAL2Plugin.get_bound_port_context')
@mock.patch('networking_nec.nwa.l2.plugin.'
'NECNWAL2Plugin._device_to_port_id')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
def test__get_device_details_segment_zero(self, f1, f2, f3, f4, f5, f6):
rpc_context = mock.MagicMock()
f1.return_value = self.l2_plugin
f2.return_value = None
f3.current = True
f4.begin.return_value = None
f5.return_value = [{'segmentation_id': 0, 'id': 'seg-id'}]
device = self.rpc._get_device_details(rpc_context, kwargs={'test':
"sample"})
self.assertTrue(device)
@mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.update_port_status')
@mock.patch('neutron.plugins.ml2.db.get_network_segments')
@mock.patch('neutron.db.api.get_session')
@mock.patch('networking_nec.nwa.l2.plugin.'
'NECNWAL2Plugin.get_bound_port_context')
@mock.patch('networking_nec.nwa.l2.plugin.'
'NECNWAL2Plugin._device_to_port_id')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
def test__get_device_details_segment_no_zero(self, f1, f2, f3, f4, f5, f6):
rpc_context = mock.MagicMock()
f1.return_value = self.l2_plugin
f2.return_value = None
f3.current = True
f4.begin.return_value = None
f5.return_value = [{'segmentation_id': 1,
'id': 'seg-id',
'network_type': 'vlan',
'physical_network': 'OpenStack/DC1/APP'}]
device = self.rpc._get_device_details(rpc_context, kwargs={'test':
"sample"})
self.assertTrue(device)
@mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.update_port_status')
@mock.patch('neutron.plugins.ml2.db.get_network_segments')
@mock.patch('neutron.db.api.get_session')
@ -173,6 +217,7 @@ class TestNECNWAServerRpcCallbacks(base.BaseTestCase):
class PortBinding(object):
segment = None
vif_type = None
segment_id = None
f6.return_value = PortBinding()
@ -202,7 +247,6 @@ class TestNECNWAServerRpcCallbacks(base.BaseTestCase):
'network_type': 'vlan',
'physical_network': 'OpenStack/DC1/APP'}]
f6.return_value = self.port_binding
device = self.rpc.get_device_details(rpc_context,
kwargs={'test': "sample"})
self.assertTrue(device)

View File

@ -185,6 +185,12 @@ class TestAddNwaTenantBinding(testlib_api.SqlTestCaseLight):
self.ssn, self.tenant2, self.nwa_tenant2).value_json,
{self.key1: False})
def test_json_value_not_dict(self):
self.assertFalse(
db_api.add_nwa_tenant_binding(
self.ssn, self.tenant1, self.nwa_tenant1,
[self.value1, self.value2])) # list
class TestGetNwaTenantBinding(base.BaseTestCase):
def setUp(self):
@ -399,6 +405,12 @@ class TestTenantQueue(testlib_api.SqlTestCase):
ret = db_api.get_nwa_tenant_queue(self.ssn, self.tenant1)
self.assertFalse(ret) # not found
def test_add_tenant_queue_no_result_found(self):
session = MagicMock()
session.query().filter().all.side_effect = NoResultFound
self.assertFalse(
db_api.add_nwa_tenant_queue(session, self.tenant1))
def test_add_tenant_queue_detail(self):
ret = db_api.add_nwa_tenant_queue(
self.ssn, self.tenant1, 'nwa_%s' % self.tenant1, topic='foo')

View File

@ -45,6 +45,15 @@ class TestNECNWAL2Plugin(base.BaseTestCase):
self.l2_plugin = plugin.NECNWAL2Plugin()
def test_extend_network_dict_providor_no_id(self):
context = MagicMock()
network = {}
result = self.l2_plugin._extend_network_dict_provider(context,
network)
self.assertIsNone(result)
@patch('neutron.plugins.ml2.db.get_network_segments')
def test_extend_network_dict_provider_segment_none(self, f1):
context = MagicMock()
@ -112,3 +121,118 @@ class TestNECNWAL2Plugin(base.BaseTestCase):
sorts, limit, marker,
page_reverse)
self.assertTrue(result)
@patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queue')
@patch('networking_nec.nwa.l2.plugin.NECNWAL2Plugin._is_alive_nwa_agent')
def test_create_nwa_agent_tenant_queue(self, f1, f2):
context = MagicMock()
tid = 'Tenant1'
f1.return_value = True
f2.return_value = None
self.l2_plugin._create_nwa_agent_tenant_queue(context, tid)
def test_create_nwa_agent_tenant_queue_not_alive(self):
context = MagicMock()
tid = 'Tenant1'
self.l2_plugin._create_nwa_agent_tenant_queue(context, tid)
@patch('neutron.plugins.ml2.plugin.Ml2Plugin.delete_network')
@patch('networking_nec.nwa.l2.plugin.'
'NECNWAL2Plugin._create_nwa_agent_tenant_queue')
@patch('neutron.plugins.ml2.plugin.Ml2Plugin.create_network')
def test_create_delete_network(self, f1, f2, f3):
context = MagicMock()
context.tenant_id = 'Tenant1'
network = {'id': '99f771b4-af69-45cc-942f-a76be4e8cd1d'}
result = self.l2_plugin.create_network(context, network)
self.assertTrue(result)
result = self.l2_plugin.delete_network(context, result.id)
self.assertTrue(result)
@patch('neutron.plugins.ml2.plugin.Ml2Plugin.create_port')
def test_create_port(self, f1):
context = MagicMock()
port = MagicMock()
result = self.l2_plugin.create_port(context, port)
self.assertTrue(result)
@patch('networking_nec.nwa.l2.rpc.nwa_agent_api.'
'NECNWAAgentApi.get_nwa_rpc_servers')
def test_get_nwa_topics(self, f1):
context = MagicMock()
id = 'Tenant1' # Tenant ID
f1.return_value = {'nwa_rpc_servers':
[{'tenant_id': 'Tenant1', 'topic': 'topic1'},
{'tenant_id': 'Tenant2', 'topic': 'topic2'},
{'tenant_id': 'Tenant1', 'topic': 'topic11'}]}
result = self.l2_plugin.get_nwa_topics(context, id)
self.assertEqual(len(result), 2)
@patch('networking_nec.nwa.l2.rpc.nwa_agent_api.'
'NECNWAAgentApi.get_nwa_rpc_servers')
def test_get_nwa_topics_not_dict(self, f1):
context = MagicMock()
id = 'Tenant1' # Tenant ID
f1.return_value = ['topic1', 'topic2'] # not dict
result = self.l2_plugin.get_nwa_topics(context, id)
self.assertEqual(len(result), 0)
def test_get_nwa_proxy(self):
tid = 'Tenant1'
result = self.l2_plugin.get_nwa_proxy(tid)
self.assertTrue(result)
def test_get_nwa_proxy_in_nwa_proxies(self):
tid = 'Tenant1'
proxy = MagicMock()
self.l2_plugin.nwa_proxies = {tid: proxy}
result = self.l2_plugin.get_nwa_proxy(tid)
self.assertEqual(result, proxy)
@patch('networking_nec.nwa.l2.plugin.NECNWAL2Plugin.get_nwa_topics')
def test_get_nwa_proxy_with_context(self, f1):
tid = 'Tenant1'
context = MagicMock()
f1.return_value = ['topic1']
result = self.l2_plugin.get_nwa_proxy(tid, context)
self.assertTrue(result)
@patch('networking_nec.nwa.l2.plugin.NECNWAL2Plugin.get_nwa_topics')
def test_get_nwa_proxy_no_topics(self, f1):
tid = 'Tenant1'
context = MagicMock()
f1.return_value = []
result = self.l2_plugin.get_nwa_proxy(tid, context)
self.assertTrue(result)
@patch('networking_nec.nwa.l2.plugin.NECNWAL2Plugin.get_agents')
def test_is_alive_nwa_agent(self, f1):
context = MagicMock()
f1.return_value = [{'alive': True}, {'alive': False}]
result = self.l2_plugin._is_alive_nwa_agent(context)
self.assertTrue(result)
@patch('networking_nec.nwa.l2.plugin.NECNWAL2Plugin.get_agents')
def test_is_alive_nwa_agent_not_alive(self, f1):
context = MagicMock()
f1.return_value = [{'alive': False}]
result = self.l2_plugin._is_alive_nwa_agent(context)
self.assertFalse(result)
def test_get_port_from_device(self):
context = MagicMock()
device = 'device'
self.l2_plugin.get_port_from_device(context, device)

View File

@ -13,6 +13,7 @@
# under the License.
from mock import MagicMock
from sqlalchemy.orm import exc as sa_exc
from neutron.tests import base
from oslo_config import cfg
@ -202,6 +203,20 @@ class TestPortcontextToNwaInfo(TestNwa):
self.assertEqual(rd['port']['ip'], p['fixed_ips'][0]['ip_address'])
self.assertEqual(rd['port']['mac'], p['mac_address'])
def test_portcontext_to_nwa_info_business_vlan(self):
# session in context
self.context.session = MagicMock()
# external network is not found
self.context.session.query().filter_by().\
one.side_effect = sa_exc.NoResultFound
self.context.network.current['name'] = 'BusinessVLAN_200'
self.context.network.current['id'] = 'Uuid-BusinessVLAN_200'
self.context.current = self.context._port
rd = nwa_l2_utils.portcontext_to_nwa_info(self.context,
self.resource_group)
self.assertIsInstance(rd, dict)
self.assertEqual(rd['network']['vlan_type'], 'BusinessVLAN')
def test_portcontext_to_nwa_info_original_port(self):
device_owner = 'do-1'
device_id = 'di-1'
@ -277,3 +292,10 @@ class test__getResourceGroupName(TestNwa):
rc = nwa_l2_utils._get_resource_group_name(self.context,
self.resource_group)
self.assertIsNone(rc)
self.context.current['device_owner'] = 'network:router_interface'
self.host_agent[0]['alive'] = False
self.resource_group = []
rc = nwa_l2_utils._get_resource_group_name(self.context,
self.resource_group)
self.assertIsNone(rc)

View File

@ -139,6 +139,25 @@ class TestNwaClientL2(test_client.TestNwaClientBase):
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 1)
def test_create_vlan_without_open_nid(self):
vlan_type = 'BusinessVLAN'
ipaddr = '10.0.0.0'
mask = 24
rd, rj = self.nwa.l2.create_vlan(
TENANT_ID, ipaddr, mask, vlan_type
)
self.post.assert_called_once_with(
workflow.NwaWorkflow.path('CreateVLAN'),
{'TenantID': TENANT_ID,
'CreateNW_IPSubnetMask1': mask,
'CreateNW_IPSubnetAddress1': ipaddr,
'CreateNW_VlanType1': vlan_type})
self.assertEqual(rd, 200)
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 1)
def test_delete_vlan(self):
vlan_name = 'LNW_BusinessVLAN_49'
vlan_type = 'BusinessVLAN'

View File

@ -68,6 +68,28 @@ class TestNwaClientL3(test_client.TestNwaClientBase):
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 1)
def test_update_tenant_fw_without_connect(self):
device_name = 'TFW0'
device_type = 'TFW'
vlan_name = 'LNW_BusinessVLAN_49'
vlan_type = 'BusinessVLAN'
rd, rj = self.nwa.l3.update_tenant_fw(
TENANT_ID,
device_name, mock.sentinel.vlan_devaddr,
vlan_name, vlan_type
)
self.post.assert_called_once_with(
workflow.NwaWorkflow.path('UpdateTenantFW'),
{'TenantID': TENANT_ID,
'ReconfigNW_DeviceName1': device_name,
'ReconfigNW_DeviceType1': device_type,
'ReconfigNW_VlanLogicalName1': vlan_name,
'ReconfigNW_Vlan_DeviceAddress1': mock.sentinel.vlan_devaddr,
'ReconfigNW_VlanType1': vlan_type})
self.assertEqual(rd, 200)
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 1)
def test_delete_tenant_fw(self):
device_name = 'TFW0'
device_type = 'TFW'
@ -131,3 +153,17 @@ class TestNwaClientL3(test_client.TestNwaClientBase):
self.assertEqual(rd, 200)
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 1)
def test_update_nat(self):
fw_name = 'TFW8'
vlan_name = 'LNW_PublicVLAN_46'
vlan_type = 'PublicVLAN'
local_ip = '172.16.0.2'
global_ip = '10.0.0.10'
rd, rj = self.nwa.l3.update_nat(
TENANT_ID,
vlan_name, vlan_type, local_ip, global_ip, fw_name
)
self.assertEqual(rj['status'], 'SUCCESS')
self.assertEqual(self.post.call_count, 2)