Add security_groups for create_port()

Currently if defined SecurityGroups and SubnetId properties for
instance in heat template, the SecurityGroups were not associated
to the port created within the previous SubnetId, in another word,
the instance are not allocated to the specified security groups
defined in heat template.

    * Modifies function _build_nics() in instance.py, adding an item
'security_groups' in the post body of create_port sending by neutronclient.
    * Adds a new function _get_security_groups_id() to map security_groups
names to ids.
    * Adds corresponding unit tests.

Fixes bug #1221564

Change-Id: Ica2df7c6f96bc982a5bd5ece3611949ae905e5c8
This commit is contained in:
Hui HX Xiang 2013-09-06 05:48:52 -07:00
parent 340dc1bdd2
commit 364031588b
2 changed files with 146 additions and 1 deletions

View File

@ -174,7 +174,8 @@ class Instance(resource.Resource):
logger.info('%s._resolve_attribute(%s) == %s' % (self.name, name, res))
return unicode(res) if res else None
def _build_nics(self, network_interfaces, subnet_id=None):
def _build_nics(self, network_interfaces,
security_groups=None, subnet_id=None):
nics = None
@ -206,11 +207,41 @@ class Instance(resource.Resource):
'network_id': network_id,
'fixed_ips': [fixed_ip]
}
if security_groups:
props['security_groups'] = \
self._get_security_groups_id(security_groups)
port = neutronclient.create_port({'port': props})['port']
nics = [{'port-id': port['id']}]
return nics
def _get_security_groups_id(self, security_groups):
"""Extract security_groups ids from security group list
This function will be deprecated if Neutron client resolves security
group name to id internally.
Args:
security_groups : A list contains security_groups ids or names
Returns:
A list of security_groups ids.
"""
ids = []
response = self.neutron().list_security_groups(self.resource_id)
for item in response:
if item['security_groups'] is not None:
for security_group in security_groups:
for groups in item['security_groups']:
if groups['name'] == security_group \
and groups['id'] not in ids:
ids.append(groups['id'])
elif groups['id'] == security_group \
and groups['id'] not in ids:
ids.append(groups['id'])
return ids
def _get_security_groups(self):
security_groups = []
for property in ('SecurityGroups', 'SecurityGroupIds'):
@ -259,6 +290,7 @@ class Instance(resource.Resource):
scheduler_hints = None
nics = self._build_nics(self.properties['NetworkInterfaces'],
security_groups=security_groups,
subnet_id=self.properties['SubnetId'])
server = None
try:

View File

@ -24,11 +24,14 @@ from heat.engine import parser
from heat.engine import resource
from heat.engine import scheduler
from heat.engine.resources import instance as instances
from heat.engine.resources import network_interface
from heat.engine.resources import nova_utils
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
from neutronclient.v2_0 import client as neutronclient
wp_template = '''
{
@ -654,6 +657,116 @@ class InstancesTest(HeatTestCase):
'id5'
]))
def test_build_nics_with_security_groups(self):
"""
Test the security groups defined in heat template can be associated
to a new created port.
"""
return_server = self.fc.servers.list()[1]
instance = self._create_test_instance(return_server,
'test_build_nics')
security_groups = ['security_group_1']
self._test_security_groups(instance, security_groups)
security_groups = ['fake_id_1']
self._test_security_groups(instance, security_groups)
security_groups = ['security_group_1', 'security_group_1']
self._test_security_groups(instance, security_groups)
security_groups = ['fake_id_1', 'fake_id_1']
self._test_security_groups(instance, security_groups)
security_groups = ['security_group_1', 'fake_id_1']
self._test_security_groups(instance, security_groups)
security_groups = ['security_group_1', 'fake_id_2']
self._test_security_groups(instance, security_groups, sg='two')
security_groups = ['wrong_group_id']
self._test_security_groups(instance, security_groups, sg='zero')
security_groups = ['wrong_group_id', 'fake_id_1']
self._test_security_groups(instance, security_groups)
security_groups = ['wrong_group_name']
self._test_security_groups(instance, security_groups, sg='zero')
security_groups = ['wrong_group_name', 'security_group_1']
self._test_security_groups(instance, security_groups)
def _test_security_groups(self, instance, security_groups, sg='one'):
fake_groups_list, props = self._get_fake_properties(sg)
def generate_sg_list():
yield fake_groups_list
nclient = neutronclient.Client()
self.m.StubOutWithMock(instance, 'neutron')
instance.neutron().MultipleTimes().AndReturn(nclient)
self.m.StubOutWithMock(neutronclient.Client, 'list_security_groups')
neutronclient.Client.list_security_groups(
instance.resource_id).AndReturn(generate_sg_list())
net_interface = network_interface.NetworkInterface
self.m.StubOutWithMock(net_interface, 'network_id_from_subnet_id')
net_interface.network_id_from_subnet_id(
nclient,
'fake_subnet_id').MultipleTimes().AndReturn('fake_network_id')
self.m.StubOutWithMock(neutronclient.Client, 'create_port')
neutronclient.Client.create_port(
{'port': props}).MultipleTimes().AndReturn(
{'port': {'id': 'fake_port_id'}})
self.m.ReplayAll()
self.assertEqual(
[{'port-id': 'fake_port_id'}],
instance._build_nics(None,
security_groups=security_groups,
subnet_id='fake_subnet_id'))
self.m.VerifyAll()
self.m.UnsetStubs()
def _get_fake_properties(self, sg='one'):
fake_groups_list = {
'security_groups': [
{
'id': 'fake_id_1',
'name': 'security_group_1',
'security_group_rules': [],
'description': 'no protocol'
},
{
'id': 'fake_id_2',
'name': 'security_group_2',
'security_group_rules': [],
'description': 'no protocol'
}
]
}
fixed_ip = {'subnet_id': 'fake_subnet_id'}
props = {
'admin_state_up': True,
'network_id': 'fake_network_id',
'fixed_ips': [fixed_ip],
'security_groups': ['fake_id_1']
}
if sg == 'zero':
props['security_groups'] = []
elif sg == 'one':
props['security_groups'] = ['fake_id_1']
elif sg == 'two':
props['security_groups'] = ['fake_id_1', 'fake_id_2']
return fake_groups_list, props
def test_instance_without_ip_address(self):
return_server = self.fc.servers.list()[3]
instance = self._create_test_instance(return_server,