Cam & Ed | Add tests for creating security groups and adding/removing them to instances
This commit is contained in:
parent
7abd25d739
commit
34bbbecc26
|
@ -565,8 +565,8 @@ class EC2Driver(driver.ComputeDriver):
|
|||
def _get_or_create_ec2_security_group(self, openstack_security_group):
|
||||
try:
|
||||
return self.ec2_conn.get_all_security_groups(openstack_security_group.name)[0]
|
||||
except EC2ResponseError as e:
|
||||
LOG.warning(e.body)
|
||||
except (EC2ResponseError, IndexError) as e:
|
||||
LOG.warning(e)
|
||||
return self.ec2_conn.create_security_group(openstack_security_group.name, openstack_security_group.description)
|
||||
|
||||
def refresh_security_group_rules(self, security_group_id):
|
||||
|
@ -576,6 +576,7 @@ class EC2Driver(driver.ComputeDriver):
|
|||
ec2_security_group = self._get_or_create_ec2_security_group(openstack_security_group)
|
||||
|
||||
ec2_ids_for_ec2_instances_with_security_group = self._get_ec2_instance_ids_with_security_group(ec2_security_group)
|
||||
|
||||
ec2_ids_for_openstack_instances_with_security_group = [
|
||||
instance.metadata['ec2_id'] for instance
|
||||
in self._get_openstack_instances_with_security_group(openstack_security_group)
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
from time import sleep
|
||||
import unittest
|
||||
import urllib2
|
||||
from boto.exception import EC2ResponseError
|
||||
|
||||
from novaclient.v1_1 import client
|
||||
from ..ec2driver_config import *
|
||||
from ec2_test_base import EC2TestBase
|
||||
|
||||
from random import randint
|
||||
|
||||
class TestSecurityGroups(EC2TestBase):
|
||||
|
||||
def test_should_create_ec2_security_group_if_it_does_not_exist(self):
|
||||
instance, instance_id = self.spawn_ec2_instance()
|
||||
security_group = self.nova.security_groups.create("securityGroupName" + str(randint(1, 10000)), "Security group description")
|
||||
|
||||
self.nova.servers.add_security_group(instance.id, security_group.name)
|
||||
|
||||
matching_ec2_security_groups = self._wait_for_ec2_security_group_to_have_instance(security_group)
|
||||
self.assertEqual(len(matching_ec2_security_groups), 1)
|
||||
|
||||
@unittest.skipIf(os.environ.get('MOCK_EC2'), 'Not supported by moto')
|
||||
def test_should_add_security_group_to_ec2_instance(self):
|
||||
instance, instance_id = self.spawn_ec2_instance()
|
||||
security_group = self.nova.security_groups.create("securityGroupName" + str(randint(1, 10000)), "Security group description")
|
||||
|
||||
self.nova.servers.add_security_group(instance.id, security_group.name)
|
||||
|
||||
matching_ec2_security_groups = self._wait_for_ec2_security_group_to_have_instance(security_group)
|
||||
self.assertEqual(instance.metadata['ec2_id'], matching_ec2_security_groups[0].instances()[0].id)
|
||||
|
||||
@unittest.skipIf(os.environ.get('MOCK_EC2'), 'Not supported by moto')
|
||||
def test_should_remove_security_group_from_ec2_instance(self):
|
||||
# Setup
|
||||
instance, instance_id = self.spawn_ec2_instance()
|
||||
security_group = self.nova.security_groups.create("securityGroupName" + str(randint(1, 10000)), "Security group description")
|
||||
|
||||
self.nova.servers.add_security_group(instance.id, security_group.name)
|
||||
|
||||
matching_ec2_security_groups = self._wait_for_ec2_security_group_to_have_instance(security_group)
|
||||
|
||||
self.assertEqual(matching_ec2_security_groups[0].instances()[0].id, instance.metadata['ec2_id'])
|
||||
|
||||
# Action
|
||||
self.nova.servers.remove_security_group(instance.id, security_group.name)
|
||||
|
||||
# Assertion
|
||||
updated_matching_ec2_security_group = self._wait_for_ec2_group_to_have_no_instances(security_group)
|
||||
|
||||
self.assertEqual(updated_matching_ec2_security_group.instances(), [])
|
||||
|
||||
def test_should_delete_security_group(self):
|
||||
pass
|
||||
|
||||
def test_rules(self):
|
||||
pass
|
||||
|
||||
def _wait_for_ec2_group_to_have_no_instances(self, security_group):
|
||||
updated_matching_ec2_security_group = self.ec2_conn.get_all_security_groups(groupnames=security_group.name)[0]
|
||||
while updated_matching_ec2_security_group.instances():
|
||||
updated_matching_ec2_security_group = self.ec2_conn.get_all_security_groups(groupnames=security_group.name)[
|
||||
0]
|
||||
return updated_matching_ec2_security_group
|
||||
|
||||
def _wait_for_ec2_security_group_to_have_instance(self, security_group):
|
||||
matching_ec2_security_groups = self.ec2_conn.get_all_security_groups(groupnames=security_group.name)
|
||||
while not matching_ec2_security_groups:
|
||||
matching_ec2_security_groups = self.ec2_conn.get_all_security_groups(groupnames=security_group.name)
|
||||
return matching_ec2_security_groups
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue