remove deprecated usage of contextlib.nested

Change-Id: Ief52c8fa35ff5a8508c5fa51572067a614ac2aed
Closes-bug: #1453433
This commit is contained in:
Ivar Lazzaro 2015-05-09 11:42:29 -07:00
parent c66404c4b6
commit 07b0071108
5 changed files with 515 additions and 492 deletions

View File

@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import webob.exc
from neutron.tests.unit.extensions import test_l3
@ -73,50 +72,59 @@ class TestMappedGroupResourceAttrs(GroupPolicyMappingDbTestCase):
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_create_delete_policy_target_group_with_subnets(self):
with contextlib.nested(self.subnet(cidr='10.10.1.0/24'),
self.subnet(cidr='10.10.2.0/24')) as (
subnet1, subnet2):
subnets = [subnet1['subnet']['id'], subnet2['subnet']['id']]
ptg = self.create_policy_target_group(subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
self.assertEqual(sorted(subnets),
sorted(ptg['policy_target_group']['subnets']))
req = self.new_show_request('policy_target_groups', ptg_id,
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(subnets),
sorted(res['policy_target_group']['subnets']))
req = self.new_delete_request('policy_target_groups', ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
with self.subnet(cidr='10.10.1.0/24') as subnet1:
with self.subnet(cidr='10.10.2.0/24') as subnet2:
subnets = [subnet1['subnet']['id'], subnet2['subnet']['id']]
ptg = self.create_policy_target_group(subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
self.assertEqual(sorted(subnets),
sorted(ptg['policy_target_group']['subnets']))
req = self.new_show_request('policy_target_groups', ptg_id,
fmt=self.fmt)
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(subnets),
sorted(res['policy_target_group']['subnets']))
req = self.new_delete_request('policy_target_groups', ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_update_policy_target_group_subnets(self):
with contextlib.nested(self.subnet(cidr='10.10.1.0/24'),
self.subnet(cidr='10.10.2.0/24'),
self.subnet(cidr='10.10.3.0/24')) as (
subnet1, subnet2, subnet3):
orig_subnets = [subnet1['subnet']['id'], subnet2['subnet']['id']]
ptg = self.create_policy_target_group(subnets=orig_subnets)
ptg_id = ptg['policy_target_group']['id']
self.assertEqual(sorted(orig_subnets),
sorted(ptg['policy_target_group']['subnets']))
new_subnets = [subnet1['subnet']['id'], subnet3['subnet']['id']]
data = {'policy_target_group': {'subnets': new_subnets}}
req = self.new_update_request('policy_target_groups', data, ptg_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(new_subnets),
sorted(res['policy_target_group']['subnets']))
req = self.new_show_request('policy_target_groups', ptg_id,
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(new_subnets),
sorted(res['policy_target_group']['subnets']))
# REVISIT(rkukura): Remove delete once subnet() context
# manager is replaced with a function that does not delete
# the resource(s) that are created.
req = self.new_delete_request('policy_target_groups', ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
with self.subnet(cidr='10.10.1.0/24') as subnet1:
with self.subnet(cidr='10.10.2.0/24') as subnet2:
with self.subnet(cidr='10.10.3.0/24') as subnet3:
orig_subnets = [subnet1['subnet']['id'],
subnet2['subnet']['id']]
ptg = self.create_policy_target_group(subnets=orig_subnets)
ptg_id = ptg['policy_target_group']['id']
self.assertEqual(
sorted(orig_subnets),
sorted(ptg['policy_target_group']['subnets']))
new_subnets = [subnet1['subnet']['id'],
subnet3['subnet']['id']]
data = {'policy_target_group': {'subnets': new_subnets}}
req = self.new_update_request('policy_target_groups', data,
ptg_id)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(
sorted(new_subnets),
sorted(res['policy_target_group']['subnets']))
req = self.new_show_request('policy_target_groups', ptg_id,
fmt=self.fmt)
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
self.assertEqual(
sorted(new_subnets),
sorted(res['policy_target_group']['subnets']))
# REVISIT(rkukura): Remove delete once subnet() context
# manager is replaced with a function that does not delete
# the resource(s) that are created.
req = self.new_delete_request('policy_target_groups',
ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(
res.status_int, webob.exc.HTTPNoContent.code)
def test_create_delete_l2_policy_with_network(self):
with self.network() as network:
@ -132,45 +140,54 @@ class TestMappedGroupResourceAttrs(GroupPolicyMappingDbTestCase):
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_create_delete_l3_policy_with_routers(self):
with contextlib.nested(self.router(), self.router()) as (router1,
router2):
routers = [router1['router']['id'], router2['router']['id']]
l3p = self.create_l3_policy(routers=routers)
l3p_id = l3p['l3_policy']['id']
self.assertEqual(sorted(routers),
sorted(l3p['l3_policy']['routers']))
req = self.new_show_request('l3_policies', l3p_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(routers),
sorted(res['l3_policy']['routers']))
req = self.new_delete_request('l3_policies', l3p_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
with self.router() as router1:
with self.router() as router2:
routers = [router1['router']['id'], router2['router']['id']]
l3p = self.create_l3_policy(routers=routers)
l3p_id = l3p['l3_policy']['id']
self.assertEqual(sorted(routers),
sorted(l3p['l3_policy']['routers']))
req = self.new_show_request('l3_policies', l3p_id,
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(sorted(routers),
sorted(res['l3_policy']['routers']))
req = self.new_delete_request('l3_policies', l3p_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_update_l3_policy_routers(self):
with contextlib.nested(self.router(), self.router(),
self.router()) as (router1, router2, router3):
orig_routers = [router1['router']['id'], router2['router']['id']]
l3p = self.create_l3_policy(routers=orig_routers)
l3p_id = l3p['l3_policy']['id']
self.assertEqual(sorted(orig_routers),
sorted(l3p['l3_policy']['routers']))
new_routers = [router1['router']['id'], router3['router']['id']]
data = {'l3_policy': {'routers': new_routers}}
req = self.new_update_request('l3_policies', data, l3p_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(new_routers),
sorted(res['l3_policy']['routers']))
req = self.new_show_request('l3_policies', l3p_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(new_routers),
sorted(res['l3_policy']['routers']))
# REVISIT(rkukura): Remove delete once router() context
# manager is replaced with a function that does not delete
# the resource(s) that are created.
req = self.new_delete_request('l3_policies', l3p_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
with self.router() as router1:
with self.router() as router2:
with self.router() as router3:
orig_routers = [router1['router']['id'],
router2['router']['id']]
l3p = self.create_l3_policy(routers=orig_routers)
l3p_id = l3p['l3_policy']['id']
self.assertEqual(sorted(orig_routers),
sorted(l3p['l3_policy']['routers']))
new_routers = [router1['router']['id'],
router3['router']['id']]
data = {'l3_policy': {'routers': new_routers}}
req = self.new_update_request('l3_policies', data, l3p_id)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(sorted(new_routers),
sorted(res['l3_policy']['routers']))
req = self.new_show_request('l3_policies',
l3p_id, fmt=self.fmt)
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
self.assertEqual(sorted(new_routers),
sorted(res['l3_policy']['routers']))
# REVISIT(rkukura): Remove delete once router() context
# manager is replaced with a function that does not delete
# the resource(s) that are created.
req = self.new_delete_request('l3_policies', l3p_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int,
webob.exc.HTTPNoContent.code)
def test_create_delete_es_with_subnet(self):
with self.subnet(cidr='10.10.1.0/24') as subnet:

View File

@ -11,8 +11,6 @@
# under the License.
#
import contextlib
import mock
from neutron import context
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -103,18 +101,16 @@ class TestNeutronClient(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
token_store.admin_auth_token = 'new_token'
my_context = context.ContextBase('userid', 'my_tenantid',
auth_token='token')
with contextlib.nested(
mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock),
mock.patch.object(client.Client, 'get_auth_info',
return_value={'auth_token': 'new_token1'}),
):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
with mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock):
with mock.patch.object(client.Client, 'get_auth_info',
return_value={'auth_token': 'new_token1'}):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
def test_admin_token_updated(self):
CONF.set_override('neutron_server_url',
@ -128,15 +124,13 @@ class TestNeutronClient(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
tokens = [{'auth_token': 'new_token1'}, {'auth_token': 'new_token'}]
my_context = context.ContextBase('userid', 'my_tenantid',
auth_token='token')
with contextlib.nested(
mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock),
mock.patch.object(client.Client, 'get_auth_info',
side_effect=tokens.pop),
):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
with mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock):
with mock.patch.object(client.Client, 'get_auth_info',
side_effect=tokens.pop):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from gbpservice.neutron.services.grouppolicy.drivers.oneconvergence import (
@ -68,23 +67,25 @@ class TestPolicyTarget(OneConvergenceGBPDriverTestCase,
# Functionality tests and api results are covered by the base class tests
def test_oneconvergence_controller_api_invoked(self):
with contextlib.nested(
mock.patch.object(MockNVSDApiClient, 'create_endpoint'),
mock.patch.object(MockNVSDApiClient, 'update_endpoint'),
mock.patch.object(MockNVSDApiClient, 'delete_endpoint')
) as (create_ep, update_ep, delete_ep):
ptg = self.create_policy_target_group(name="ptg1")
ptg_id = ptg['policy_target_group']['id']
with mock.patch.object(MockNVSDApiClient,
'create_endpoint') as create_ep:
with mock.patch.object(MockNVSDApiClient,
'update_endpoint') as update_ep:
with mock.patch.object(MockNVSDApiClient,
'delete_endpoint') as delete_ep:
ptg = self.create_policy_target_group(name="ptg1")
ptg_id = ptg['policy_target_group']['id']
# Create policy_target with implicit port.
pt = self.create_policy_target(
name="pt1", policy_target_group_id=ptg_id)['policy_target']
create_ep.assert_called_once_with(mock.ANY, pt)
pt = self.update_policy_target(
pt['id'], name="new_pt")['policy_target']
update_ep.assert_called_once_with(mock.ANY, pt)
self.delete_policy_target(pt['id'])
delete_ep.assert_called_once_with(mock.ANY, pt['id'])
# Create policy_target with implicit port.
pt = self.create_policy_target(
name="pt1",
policy_target_group_id=ptg_id)['policy_target']
create_ep.assert_called_once_with(mock.ANY, pt)
pt = self.update_policy_target(
pt['id'], name="new_pt")['policy_target']
update_ep.assert_called_once_with(mock.ANY, pt)
self.delete_policy_target(pt['id'])
delete_ep.assert_called_once_with(mock.ANY, pt['id'])
class TestPolicyTargetGroup(OneConvergenceGBPDriverTestCase,
@ -110,43 +111,50 @@ class TestPolicyTargetGroup(OneConvergenceGBPDriverTestCase,
pass
def test_oneconvergence_controller_api_invoked(self):
with contextlib.nested(
mock.patch.object(MockNVSDApiClient, 'create_endpointgroup'),
mock.patch.object(MockNVSDApiClient, 'update_endpointgroup'),
mock.patch.object(MockNVSDApiClient, 'delete_endpointgroup')
) as (create_epg, update_epg, delete_epg):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
create_epg.assert_called_once_with(mock.ANY, ptg)
ptg = self.update_policy_target_group(
ptg['id'], name="new_ptg")['policy_target_group']
update_epg.assert_called_once_with(mock.ANY, ptg)
self.delete_policy_target_group(ptg['id'])
delete_epg.assert_called_once_with(mock.ANY, ptg['id'])
with mock.patch.object(MockNVSDApiClient,
'create_endpointgroup') as create_epg:
with mock.patch.object(MockNVSDApiClient,
'update_endpointgroup') as update_epg:
with mock.patch.object(MockNVSDApiClient,
'delete_endpointgroup') as delete_epg:
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
create_epg.assert_called_once_with(mock.ANY, ptg)
ptg = self.update_policy_target_group(
ptg['id'],
name="new_ptg")['policy_target_group']
update_epg.assert_called_once_with(mock.ANY, ptg)
self.delete_policy_target_group(ptg['id'])
delete_epg.assert_called_once_with(mock.ANY, ptg['id'])
class TestPolicyClassifier(OneConvergenceGBPDriverTestCase):
def test_oneconvergence_controller_api_invoked(self):
with contextlib.nested(
mock.patch.object(
MockNVSDApiClient, 'create_policy_classifier'),
mock.patch.object(
MockNVSDApiClient, 'update_policy_classifier'),
mock.patch.object(
MockNVSDApiClient, 'delete_policy_classifier')
) as (create_classifier, update_classifier, delete_classifier):
classifier = self.create_policy_classifier(name="classifier1")
classifier = classifier['policy_classifier']
classifier.update({"policy_rules": []})
create_classifier.assert_called_once_with(mock.ANY, classifier)
classifier = self.update_policy_classifier(
classifier['id'], name="new_classifier")['policy_classifier']
classifier.update({"policy_rules": []})
update_classifier.assert_called_once_with(mock.ANY, classifier)
self.delete_policy_classifier(classifier['id'])
delete_classifier.assert_called_once_with(
mock.ANY, classifier['id'])
with mock.patch.object(
MockNVSDApiClient,
'create_policy_classifier') as create_classifier:
with mock.patch.object(
MockNVSDApiClient,
'update_policy_classifier') as update_classifier:
with mock.patch.object(
MockNVSDApiClient,
'delete_policy_classifier') as delete_classifier:
classifier = self.create_policy_classifier(
name="classifier1")
classifier = classifier['policy_classifier']
classifier.update({"policy_rules": []})
create_classifier.assert_called_once_with(mock.ANY,
classifier)
classifier = self.update_policy_classifier(
classifier['id'],
name="new_classifier")['policy_classifier']
classifier.update({"policy_rules": []})
update_classifier.assert_called_once_with(mock.ANY,
classifier)
self.delete_policy_classifier(classifier['id'])
delete_classifier.assert_called_once_with(
mock.ANY, classifier['id'])
class TestL2Policy(OneConvergenceGBPDriverTestCase,

View File

@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import itertools
import mock
@ -550,23 +549,22 @@ class TestPolicyTargetGroup(ResourceMappingTestCase):
network = self.deserialize(self.fmt, req.get_response(self.api))
# Create policy_target group with explicit subnet.
with contextlib.nested(
self.subnet(network=network, cidr='10.10.1.0/24'),
self.subnet(network=network, cidr='10.10.2.0/24')
) as (subnet1, subnet2):
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
subnets = [subnet1_id]
ptg = self.create_policy_target_group(
l2_policy_id=l2p_id, subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
with self.subnet(network=network, cidr='10.10.1.0/24') as subnet1:
with self.subnet(network=network, cidr='10.10.2.0/24') as subnet2:
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
subnets = [subnet1_id]
ptg = self.create_policy_target_group(
l2_policy_id=l2p_id, subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
# Add subnet.
subnets = [subnet1_id, subnet2_id]
data = {'policy_target_group': {'subnets': subnets}}
req = self.new_update_request('policy_target_groups', data, ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
# Add subnet.
subnets = [subnet1_id, subnet2_id]
data = {'policy_target_group': {'subnets': subnets}}
req = self.new_update_request('policy_target_groups', data,
ptg_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
def test_add_subnet_negative(self):
# Create L2P
@ -604,23 +602,23 @@ class TestPolicyTargetGroup(ResourceMappingTestCase):
network = self.deserialize(self.fmt, req.get_response(self.api))
# Create policy_target group with explicit subnets.
with contextlib.nested(
self.subnet(network=network, cidr='10.10.1.0/24'),
self.subnet(network=network, cidr='10.10.2.0/24')
) as (subnet1, subnet2):
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
subnets = [subnet1_id, subnet2_id]
ptg = self.create_policy_target_group(
l2_policy_id=l2p_id, subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
with self.subnet(network=network, cidr='10.10.1.0/24') as subnet1:
with self.subnet(network=network, cidr='10.10.2.0/24') as subnet2:
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
subnets = [subnet1_id, subnet2_id]
ptg = self.create_policy_target_group(
l2_policy_id=l2p_id, subnets=subnets)
ptg_id = ptg['policy_target_group']['id']
# Verify removing subnet rejected.
data = {'policy_target_group': {'subnets': [subnet2_id]}}
req = self.new_update_request('policy_target_groups', data, ptg_id)
data = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual('PolicyTargetGroupSubnetRemovalNotSupported',
data['NeutronError']['type'])
# Verify removing subnet rejected.
data = {'policy_target_group': {'subnets': [subnet2_id]}}
req = self.new_update_request('policy_target_groups', data,
ptg_id)
data = self.deserialize(self.fmt, req.get_response(
self.ext_api))
self.assertEqual('PolicyTargetGroupSubnetRemovalNotSupported',
data['NeutronError']['type'])
def test_subnet_allocation(self):
ptg1 = self.create_policy_target_group(name="ptg1")
@ -951,16 +949,16 @@ class TestL3Policy(ResourceMappingTestCase):
def test_multiple_routers_rejected(self):
# Verify update l3 policy with explicit router rejected.
with contextlib.nested(self.router(),
self.router()) as (router1, router2):
router1_id = router1['router']['id']
router2_id = router2['router']['id']
data = self.create_l3_policy(name="l3p1",
routers=[router1_id, router2_id],
expected_res_status=
webob.exc.HTTPBadRequest.code)
self.assertEqual('L3PolicyMultipleRoutersNotSupported',
data['NeutronError']['type'])
with self.router() as router1:
with self.router() as router2:
router1_id = router1['router']['id']
router2_id = router2['router']['id']
data = self.create_l3_policy(name="l3p1",
routers=[router1_id, router2_id],
expected_res_status=
webob.exc.HTTPBadRequest.code)
self.assertEqual('L3PolicyMultipleRoutersNotSupported',
data['NeutronError']['type'])
def test_router_update_rejected(self):
# Create L3 policy with implicit router.
@ -1019,151 +1017,152 @@ class TestL3Policy(ResourceMappingTestCase):
def test_create_l3p_es(self):
# Simple test to verify l3p created with 1-N ES
with contextlib.nested(
self.network(router__external=True),
self.network(router__external=True)) as (net1, net2):
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net1),
self.subnet(cidr='10.10.2.0/24', network=net2)) as (
sub1, sub2):
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
external_segments = {es1['id']: []}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16', expected_res_status=201,
external_segments=external_segments)
req = self.new_delete_request('l3_policies',
l3p['l3_policy']['id'])
req.get_response(self.ext_api)
external_segments.update({es2['id']: []})
res = self.create_l3_policy(
ip_pool='192.168.0.0/16', expected_res_status=400,
external_segments=external_segments)
self.assertEqual('MultipleESPerL3PolicyNotSupported',
res['NeutronError']['type'])
with self.network(router__external=True) as net1:
with self.network(router__external=True) as net2:
with self.subnet(cidr='10.10.1.0/24', network=net1) as sub1:
with self.subnet(cidr='10.10.2.0/24',
network=net2) as sub2:
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
external_segments = {es1['id']: []}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16', expected_res_status=201,
external_segments=external_segments)
req = self.new_delete_request('l3_policies',
l3p['l3_policy']['id'])
req.get_response(self.ext_api)
external_segments.update({es2['id']: []})
res = self.create_l3_policy(
ip_pool='192.168.0.0/16', expected_res_status=400,
external_segments=external_segments)
self.assertEqual('MultipleESPerL3PolicyNotSupported',
res['NeutronError']['type'])
def test_update_l3p_es(self):
# Simple test to verify l3p updated with 1-N ES
with contextlib.nested(
self.network(router__external=True),
self.network(router__external=True)) as (net1, net2):
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net1),
self.subnet(cidr='10.10.2.0/24', network=net2)) as (
sub1, sub2):
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
# None to es1, es1 to es2
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16')['l3_policy']
for external_segments in [{es1['id']: []}, {es2['id']: []}]:
self.update_l3_policy(
l3p['id'], expected_res_status=200,
external_segments=external_segments)
# es2 to [es1, es2]
external_segments = {es2['id']: [], es1['id']: []}
res = self.update_l3_policy(
l3p['id'], expected_res_status=400,
external_segments=external_segments)
self.assertEqual('MultipleESPerL3PolicyNotSupported',
res['NeutronError']['type'])
with self.network(router__external=True) as net1:
with self.network(router__external=True) as net2:
with self.subnet(cidr='10.10.1.0/24', network=net1) as sub1:
with self.subnet(cidr='10.10.2.0/24',
network=net2) as sub2:
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
# None to es1, es1 to es2
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16')['l3_policy']
for external_segments in [{es1['id']: []}, {es2['id']:
[]}]:
self.update_l3_policy(
l3p['id'], expected_res_status=200,
external_segments=external_segments)
# es2 to [es1, es2]
external_segments = {es2['id']: [], es1['id']: []}
res = self.update_l3_policy(
l3p['id'], expected_res_status=400,
external_segments=external_segments)
self.assertEqual('MultipleESPerL3PolicyNotSupported',
res['NeutronError']['type'])
def test_es_router_plumbing(self):
with contextlib.nested(
self.network(router__external=True),
self.network(router__external=True)) as (net1, net2):
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net1),
self.subnet(cidr='10.10.2.0/24', network=net2)) as (
subnet1, subnet2):
subnet1 = subnet1['subnet']
subnet2 = subnet2['subnet']
es1 = self.create_external_segment(
subnet_id=subnet1['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=subnet2['id'])['external_segment']
es_dict = {es1['id']: ['10.10.1.3']}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16',
external_segments=es_dict)['l3_policy']
req = self.new_show_request('routers', l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(
self.ext_api))['router']
self.assertEqual(
subnet1['network_id'],
res['external_gateway_info']['network_id'])
# Verify auto assigned addresses propagated to L3P
es_dict = {es2['id']: []}
l3p = self.update_l3_policy(
l3p['id'], external_segments=es_dict,
expected_res_status=200)['l3_policy']
req = self.new_show_request('routers', l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(
self.ext_api))['router']
self.assertEqual(
subnet2['network_id'],
res['external_gateway_info']['network_id'])
self.assertEqual(
[x['ip_address'] for x in
res['external_gateway_info']['external_fixed_ips']],
l3p['external_segments'][es2['id']])
# Verify that the implicit assignment is persisted
req = self.new_show_request('l3_policies', l3p['id'],
fmt=self.fmt)
l3p = self.deserialize(self.fmt, req.get_response(
self.ext_api))['l3_policy']
self.assertEqual(
[x['ip_address'] for x in
res['external_gateway_info']['external_fixed_ips']],
l3p['external_segments'][es2['id']])
with self.network(router__external=True) as net1:
with self.network(router__external=True) as net2:
with self.subnet(cidr='10.10.1.0/24', network=net1) as sub1:
with self.subnet(cidr='10.10.2.0/24',
network=net2) as sub2:
subnet1 = sub1['subnet']
subnet2 = sub2['subnet']
es1 = self.create_external_segment(
subnet_id=subnet1['id'])['external_segment']
es2 = self.create_external_segment(
subnet_id=subnet2['id'])['external_segment']
es_dict = {es1['id']: ['10.10.1.3']}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16',
external_segments=es_dict)['l3_policy']
req = self.new_show_request('routers',
l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(
self.ext_api))['router']
self.assertEqual(
subnet1['network_id'],
res['external_gateway_info']['network_id'])
# Verify auto assigned addresses propagated to L3P
es_dict = {es2['id']: []}
l3p = self.update_l3_policy(
l3p['id'], external_segments=es_dict,
expected_res_status=200)['l3_policy']
req = self.new_show_request('routers',
l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(
self.ext_api))['router']
self.assertEqual(
subnet2['network_id'],
res['external_gateway_info']['network_id'])
self.assertEqual(
[x['ip_address'] for x in
res['external_gateway_info'][
'external_fixed_ips']],
l3p['external_segments'][es2['id']])
# Verify that the implicit assignment is persisted
req = self.new_show_request('l3_policies', l3p['id'],
fmt=self.fmt)
l3p = self.deserialize(self.fmt, req.get_response(
self.ext_api))['l3_policy']
self.assertEqual(
[x['ip_address'] for x in
res['external_gateway_info'][
'external_fixed_ips']],
l3p['external_segments'][es2['id']])
def test_es_routes(self):
routes1 = [{'destination': '0.0.0.0/0', 'nexthop': '10.10.1.1'},
{'destination': '172.0.0.0/16', 'nexthop': '10.10.1.1'}]
routes2 = [{'destination': '0.0.0.0/0', 'nexthop': '10.10.2.1'},
{'destination': '172.0.0.0/16', 'nexthop': '10.10.2.1'}]
with contextlib.nested(
self.network(router__external=True),
self.network(router__external=True)) as (net1, net2):
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net1),
self.subnet(cidr='10.10.2.0/24', network=net2)) as (
sub1, sub2):
es1 = self.create_external_segment(
cidr='10.10.1.0/24',
subnet_id=sub1['subnet']['id'],
external_routes=routes1)['external_segment']
es2 = self.create_external_segment(
cidr='10.10.2.0/24',
subnet_id=sub2['subnet']['id'],
external_routes=routes2)['external_segment']
es_dict = {es1['id']: []}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16', external_segments=es_dict,
expected_res_status=201)['l3_policy']
req = self.new_show_request('routers', l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(routes1, res['router']['routes'])
es_dict = {es2['id']: []}
self.update_l3_policy(l3p['id'], external_segments=es_dict,
expected_res_status=200)
req = self.new_show_request('routers', l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(routes2, res['router']['routes'])
with self.network(router__external=True) as net1:
with self.network(router__external=True) as net2:
with self.subnet(cidr='10.10.1.0/24', network=net1) as sub1:
with self.subnet(cidr='10.10.2.0/24',
network=net2) as sub2:
es1 = self.create_external_segment(
cidr='10.10.1.0/24',
subnet_id=sub1['subnet']['id'],
external_routes=routes1)['external_segment']
es2 = self.create_external_segment(
cidr='10.10.2.0/24',
subnet_id=sub2['subnet']['id'],
external_routes=routes2)['external_segment']
es_dict = {es1['id']: []}
l3p = self.create_l3_policy(
ip_pool='192.168.0.0/16',
external_segments=es_dict,
expected_res_status=201)['l3_policy']
req = self.new_show_request('routers',
l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(routes1, res['router']['routes'])
es_dict = {es2['id']: []}
self.update_l3_policy(l3p['id'],
external_segments=es_dict,
expected_res_status=200)
req = self.new_show_request('routers',
l3p['routers'][0],
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertEqual(routes2, res['router']['routes'])
def test_create_l3p_using_different_tenant_router_rejected(self):
with contextlib.nested(self.router()) as router1:
router1_id = router1[0]['router']['id']
with self.router() as router1:
router1_id = router1['router']['id']
res = self.create_l3_policy(name="l3p1",
tenant_id='tenant2',
routers=[router1_id],
@ -2517,105 +2516,103 @@ class TestExternalPolicy(ResourceMappingTestCase):
def test_create(self):
with self.network(router__external=True) as net:
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net),
self.subnet(cidr='10.10.2.0/24', network=net)) as (
sub1, sub2):
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'],
shared=True)['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
# Shared Rejected
res = self.create_external_policy(
expected_res_status=400, external_segments=[es1['id']],
shared=True)
self.assertEqual('InvalidSharedResource',
res['NeutronError']['type'])
# Multiple ES reject
res = self.create_external_policy(
expected_res_status=400,
external_segments=[es1['id'], es2['id']])
self.assertEqual('MultipleESPerEPNotSupported',
res['NeutronError']['type'])
# No ES reject
res = self.create_external_policy(
expected_res_status=400, external_segments=[])
self.assertEqual('ESIdRequiredWhenCreatingEP',
res['NeutronError']['type'])
with self.subnet(cidr='10.10.1.0/24', network=net) as sub1:
with self.subnet(cidr='10.10.2.0/24', network=net) as sub2:
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'],
shared=True)['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
# Shared Rejected
res = self.create_external_policy(
expected_res_status=400, external_segments=[es1['id']],
shared=True)
self.assertEqual('InvalidSharedResource',
res['NeutronError']['type'])
# Multiple ES reject
res = self.create_external_policy(
expected_res_status=400,
external_segments=[es1['id'], es2['id']])
self.assertEqual('MultipleESPerEPNotSupported',
res['NeutronError']['type'])
# No ES reject
res = self.create_external_policy(
expected_res_status=400, external_segments=[])
self.assertEqual('ESIdRequiredWhenCreatingEP',
res['NeutronError']['type'])
# Multiple EP per tenant rejected
self.create_external_policy(external_segments=[es1['id']],
expected_res_status=201)
res = self.create_external_policy(
expected_res_status=400, external_segments=[es2['id']])
self.assertEqual('OnlyOneEPPerTenantAllowed',
res['NeutronError']['type'])
# Multiple EP per tenant rejected
self.create_external_policy(external_segments=[es1['id']],
expected_res_status=201)
res = self.create_external_policy(
expected_res_status=400, external_segments=[es2['id']])
self.assertEqual('OnlyOneEPPerTenantAllowed',
res['NeutronError']['type'])
def test_update(self):
with self.network(router__external=True) as net:
with contextlib.nested(
self.subnet(cidr='10.10.1.0/24', network=net),
self.subnet(cidr='10.10.2.0/24', network=net)) as (
sub1, sub2):
route = {'destination': '172.0.0.0/8', 'nexthop': None}
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'],
external_routes=[route])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
ep = self.create_external_policy(
external_segments=[es1['id']], expected_res_status=201)
ep = ep['external_policy']
# ES update rejectes
res = self.update_external_policy(
ep['id'], external_segments=[es2['id']],
expected_res_status=400)
self.assertEqual('ESUpdateNotSupportedForEP',
res['NeutronError']['type'])
# Rules changed when changing PRS
pr_ssh = self._create_ssh_allow_rule()
pr_http = self._create_http_allow_rule()
with self.subnet(cidr='10.10.1.0/24', network=net) as sub1:
with self.subnet(cidr='10.10.2.0/24', network=net) as sub2:
route = {'destination': '172.0.0.0/8', 'nexthop': None}
es1 = self.create_external_segment(
subnet_id=sub1['subnet']['id'],
external_routes=[route])['external_segment']
es2 = self.create_external_segment(
subnet_id=sub2['subnet']['id'])['external_segment']
ep = self.create_external_policy(
external_segments=[es1['id']], expected_res_status=201)
ep = ep['external_policy']
# ES update rejectes
res = self.update_external_policy(
ep['id'], external_segments=[es2['id']],
expected_res_status=400)
self.assertEqual('ESUpdateNotSupportedForEP',
res['NeutronError']['type'])
# Rules changed when changing PRS
pr_ssh = self._create_ssh_allow_rule()
pr_http = self._create_http_allow_rule()
prs_ssh = self.create_policy_rule_set(
policy_rules=[pr_ssh['id']])['policy_rule_set']
prs_http = self.create_policy_rule_set(
policy_rules=[pr_http['id']])['policy_rule_set']
prs_ssh = self.create_policy_rule_set(
policy_rules=[pr_ssh['id']])['policy_rule_set']
prs_http = self.create_policy_rule_set(
policy_rules=[pr_http['id']])['policy_rule_set']
self.update_external_policy(
ep['id'], provided_policy_rule_sets={prs_ssh['id']: ''},
consumed_policy_rule_sets={prs_ssh['id']: ''},
expected_res_status=200)
self.update_external_policy(
ep['id'], provided_policy_rule_sets={prs_ssh['id']:
''},
consumed_policy_rule_sets={prs_ssh['id']: ''},
expected_res_status=200)
expected_cidrs = self._calculate_expected_external_cidrs(
es1, [])
self.assertTrue(len(expected_cidrs) > 0)
current_ssh_rules = self._verify_prs_rules(prs_ssh['id'])
self._verify_prs_rules(prs_http['id'])
expected_cidrs = self._calculate_expected_external_cidrs(
es1, [])
self.assertTrue(len(expected_cidrs) > 0)
current_ssh_rules = self._verify_prs_rules(prs_ssh['id'])
self._verify_prs_rules(prs_http['id'])
# Now swap the contract
self.update_external_policy(
ep['id'], provided_policy_rule_sets={prs_http['id']: ''},
consumed_policy_rule_sets={prs_http['id']: ''},
expected_res_status=200)
# Now swap the contract
self.update_external_policy(
ep['id'], provided_policy_rule_sets={prs_http['id']:
''},
consumed_policy_rule_sets={prs_http['id']: ''},
expected_res_status=200)
# SSH rules removed
for rule in current_ssh_rules:
if not (rule['direction'] == ['egress']
and rule['remote_ip_prefix'] == ['0.0.0.0/0']):
self.assertFalse(self._get_sg_rule(**rule))
# SSH rules removed
for rule in current_ssh_rules:
if not (rule['direction'] == ['egress']
and rule['remote_ip_prefix'] == ['0.0.0.0/0']):
self.assertFalse(self._get_sg_rule(**rule))
# HTTP Added
current_http_rules = self._verify_prs_rules(prs_http['id'])
# HTTP Added
current_http_rules = self._verify_prs_rules(prs_http['id'])
# All removed
self.update_external_policy(
ep['id'], provided_policy_rule_sets={},
consumed_policy_rule_sets={}, expected_res_status=200)
for rule in current_http_rules:
if not (rule['direction'] == ['egress']
and rule['remote_ip_prefix'] == ['0.0.0.0/0']):
self.assertFalse(self._get_sg_rule(**rule))
# All removed
self.update_external_policy(
ep['id'], provided_policy_rule_sets={},
consumed_policy_rule_sets={}, expected_res_status=200)
for rule in current_http_rules:
if not (rule['direction'] == ['egress']
and rule['remote_ip_prefix'] == ['0.0.0.0/0']):
self.assertFalse(self._get_sg_rule(**rule))
class TestPolicyAction(ResourceMappingTestCase):

View File

@ -11,8 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import heatclient
import mock
from neutron.openstack.common import uuidutils
@ -102,54 +100,58 @@ class TestServiceChainInstance(SimpleChainDriverTestCase):
stack3 = {'stack': {'id': uuidutils.generate_uuid()}}
expected_create_calls = []
expected_delete_calls = []
with contextlib.nested(
mock.patch.object(simplechain_driver.HeatClient,
'create'),
mock.patch.object(simplechain_driver.HeatClient,
'delete'),
) as (stack_create, stack_delete):
stack_create.return_value = stack1
instance1_name = "sc_instance_1"
sc_instance1 = self.create_servicechain_instance(
name=instance1_name,
servicechain_specs=[sc_spec_id])
self.assertEqual([sc_spec_id],
sc_instance1['servicechain_instance']['servicechain_specs'])
stack_name = "stack_" + instance1_name + scn1_name + scn_id[:5]
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template1), {}))
stack_create.return_value = stack2
instance2_name = "sc_instance_2"
sc_instance2 = self.create_servicechain_instance(
name=instance2_name,
servicechain_specs=[sc_spec_id])
self.assertEqual([sc_spec_id],
sc_instance2['servicechain_instance']['servicechain_specs'])
stack_name = "stack_" + instance2_name + scn1_name + scn_id[:5]
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template1), {}))
with mock.patch.object(simplechain_driver.HeatClient,
'create') as stack_create:
with mock.patch.object(simplechain_driver.HeatClient,
'delete') as stack_delete:
stack_create.return_value = stack1
instance1_name = "sc_instance_1"
sc_instance1 = self.create_servicechain_instance(
name=instance1_name,
servicechain_specs=[sc_spec_id])
self.assertEqual([sc_spec_id],
sc_instance1['servicechain_instance'][
'servicechain_specs'])
stack_name = "stack_" + instance1_name + scn1_name + scn_id[:5]
expected_create_calls.append(
mock.call(stack_name,
jsonutils.loads(template1), {}))
stack_create.return_value = stack2
instance2_name = "sc_instance_2"
sc_instance2 = self.create_servicechain_instance(
name=instance2_name,
servicechain_specs=[sc_spec_id])
self.assertEqual(
[sc_spec_id],
sc_instance2['servicechain_instance'][
'servicechain_specs'])
stack_name = "stack_" + instance2_name + scn1_name + scn_id[:5]
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template1), {}))
#Now perform an update of the spec
new_spec = {'servicechain_spec': {'nodes': [scn2_id]}}
stack_create.return_value = stack3
req = self.new_update_request(
'servicechain_specs', new_spec, sc_spec_id)
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
# The two existing stacks will be deleted and two new stacks
# will be created
expected_delete_calls.append(mock.call(stack1['stack']['id']))
expected_delete_calls.append(mock.call(stack2['stack']['id']))
stack_name = "stack_" + instance1_name + scn2_name + scn2_id[:5]
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template2), {}))
stack_name = "stack_" + instance2_name + scn2_name + scn2_id[:5]
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template2), {}))
self.assertEqual(expected_delete_calls,
stack_delete.call_args_list)
self.assertEqual(expected_create_calls,
stack_create.call_args_list)
#Now perform an update of the spec
new_spec = {'servicechain_spec': {'nodes': [scn2_id]}}
stack_create.return_value = stack3
req = self.new_update_request(
'servicechain_specs', new_spec, sc_spec_id)
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
# The two existing stacks will be deleted and two new stacks
# will be created
expected_delete_calls.append(mock.call(stack1['stack']['id']))
expected_delete_calls.append(mock.call(stack2['stack']['id']))
stack_name = ("stack_" + instance1_name + scn2_name +
scn2_id[:5])
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template2), {}))
stack_name = ("stack_" + instance2_name + scn2_name +
scn2_id[:5])
expected_create_calls.append(
mock.call(stack_name, jsonutils.loads(template2), {}))
self.assertEqual(expected_delete_calls,
stack_delete.call_args_list)
self.assertEqual(expected_create_calls,
stack_create.call_args_list)
def test_chain_instance_create(self):
name = "scs1"
@ -217,18 +219,20 @@ class TestServiceChainInstance(SimpleChainDriverTestCase):
# Verify that as part of delete service chain instance we call
# get method for heat stack 5 times before giving up if the state
# does not become DELETE_COMPLETE
with contextlib.nested(
mock.patch.object(simplechain_driver.HeatClient, 'delete'),
mock.patch.object(simplechain_driver.HeatClient, 'get')) as (
stack_delete, stack_get):
stack_get.return_value = MockStackObject('PENDING_DELETE')
req = self.new_delete_request(
'servicechain_instances',
sc_instance['servicechain_instance']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
stack_delete.assert_called_once_with(mock.ANY)
self.assertEqual(STACK_DELETE_RETRIES, stack_get.call_count)
with mock.patch.object(simplechain_driver.HeatClient,
'delete') as stack_delete:
with mock.patch.object(simplechain_driver.HeatClient,
'get') as stack_get:
stack_get.return_value = MockStackObject('PENDING_DELETE')
req = self.new_delete_request(
'servicechain_instances',
sc_instance['servicechain_instance']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPNoContent.code,
res.status_int)
stack_delete.assert_called_once_with(mock.ANY)
self.assertEqual(STACK_DELETE_RETRIES,
stack_get.call_count)
# Create and delete another service chain instance and verify that
# we call get method for heat stack only once if the stack state
@ -236,20 +240,23 @@ class TestServiceChainInstance(SimpleChainDriverTestCase):
sc_instance = self.create_servicechain_instance(
name="sc_instance_1",
servicechain_specs=[sc_spec_id])
self.assertEqual([sc_spec_id],
self.assertEqual(
[sc_spec_id],
sc_instance['servicechain_instance']['servicechain_specs'])
with contextlib.nested(
mock.patch.object(simplechain_driver.HeatClient, 'delete'),
mock.patch.object(simplechain_driver.HeatClient, 'get')) as (
stack_delete, stack_get):
stack_get.return_value = MockStackObject('DELETE_COMPLETE')
req = self.new_delete_request(
'servicechain_instances',
sc_instance['servicechain_instance']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
stack_delete.assert_called_once_with(mock.ANY)
self.assertEqual(1, stack_get.call_count)
with mock.patch.object(simplechain_driver.HeatClient,
'delete') as stack_delete:
with mock.patch.object(simplechain_driver.HeatClient,
'get') as stack_get:
stack_get.return_value = MockStackObject(
'DELETE_COMPLETE')
req = self.new_delete_request(
'servicechain_instances',
sc_instance['servicechain_instance']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPNoContent.code,
res.status_int)
stack_delete.assert_called_once_with(mock.ANY)
self.assertEqual(1, stack_get.call_count)
def test_stack_not_found_ignored(self):
name = "scs1"