Ed & Cam | Implement RuleComparator for checking equality of OpenStack and EC2 rules

This commit is contained in:
cameron-r 2014-10-29 17:28:10 -05:00
parent 90ddacba5f
commit 98843957fc
3 changed files with 149 additions and 0 deletions

33
rule_comparator.py Normal file
View File

@ -0,0 +1,33 @@
class RuleComparator:
def __init__(self, ec2_connection):
self.ec2_connection = ec2_connection
def rules_are_equal(self, openstack_rule, ec2_rule):
if self._ip_protocols_are_different(ec2_rule, openstack_rule)\
or self._from_ports_are_different(ec2_rule, openstack_rule)\
or self._to_ports_are_different(ec2_rule, openstack_rule)\
or self._ip_ranges_are_present_and_different(ec2_rule, openstack_rule)\
or self._group_names_are_present_and_different(openstack_rule, ec2_rule):
return False
return True
def _ip_protocols_are_different(self, ec2_rule, openstack_rule):
return openstack_rule['ip_protocol'] != ec2_rule.ip_protocol
def _from_ports_are_different(self, ec2_rule, openstack_rule):
return openstack_rule['from_port'] != ec2_rule.from_port
def _to_ports_are_different(self, ec2_rule, openstack_rule):
return openstack_rule['to_port'] != ec2_rule.to_port
def _ip_ranges_are_present_and_different(self, ec2_rule, openstack_rule):
return ('cidr' in openstack_rule['ip_range'] and openstack_rule['ip_range']['cidr'] != ec2_rule.grants[0].cidr_ip)
def _group_names_are_present_and_different(self, openstack_rule, ec2_rule):
if 'name' not in openstack_rule['group']:
return False
else:
openstack_group = openstack_rule['group']
ec2_group_name = self.ec2_connection.get_all_security_groups(ec2_rule.grants[0].group_id)[0].name
return openstack_group['name'] == ec2_group_name

View File

@ -0,0 +1,43 @@
from collections import namedtuple
class FakeEC2RuleBuilder():
EC2Rule = namedtuple('EC2Rule', 'ip_protocol from_port to_port grants')
GroupOrCIDR = namedtuple('GroupOrCIDR', 'cidr_ip group_id')
def __init__(self):
self.ip_protocol = 'udp'
self.from_port = 1111
self.to_port = 3333
self.ip_range = '0.0.0.0/0'
self.allowed_security_group_id = None
@staticmethod
def an_ec2_rule():
return FakeEC2RuleBuilder()
def with_ip_protocol(self, ip_protocol):
self.ip_protocol = ip_protocol
return self
def with_from_port(self, from_port):
self.from_port = from_port
return self
def with_to_port(self, to_port):
self.to_port = to_port
return self
def with_ip_range(self, ip_range):
self.ip_range = ip_range
self.allowed_security_group_id = None
return self
def with_allowed_security_group_id(self, allowed_security_group_id):
self.allowed_security_group_id = allowed_security_group_id
self.ip_range = None
return self
def build(self):
grants = [self.GroupOrCIDR(self.ip_range, self.allowed_security_group_id)]
return self.EC2Rule(self.ip_protocol, self.from_port, self.to_port, grants)

View File

@ -0,0 +1,73 @@
from collections import namedtuple
import unittest
from boto.ec2 import EC2Connection
from mock import Mock
from nova.virt.ec2.rule_comparator import RuleComparator
from fake_ec2_rule_builder import FakeEC2RuleBuilder
class TestRuleComparator(unittest.TestCase):
def setUp(self):
self.FakeSecurityGroup = namedtuple('FakeSecurityGroup', 'name')
self.openstack_rule = {
'ip_protocol': 'udp',
'from_port': 1111,
'to_port': 3333,
'ip_range': {'cidr': '0.0.0.0/0'},
'group': {}
}
self.ec2_connection = Mock(spec=EC2Connection)
self.rule_comparator = RuleComparator(self.ec2_connection)
def test_should_return_true_if_rules_have_matching_ip_protocol_from_port_to_port_and_ip_range(self):
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.build()
self.assertTrue(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
def test_should_return_false_if_rules_have_different_ip_protocols(self):
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.with_ip_protocol('tcp')\
.build()
self.assertFalse(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
def test_should_return_false_if_rules_have_different_from_ports(self):
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.with_from_port(2222)\
.build()
self.assertFalse(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
def test_should_return_false_if_rules_have_different_to_ports(self):
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.with_to_port(4444)\
.build()
self.assertFalse(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
def test_should_return_false_if_rules_have_different_ip_range(self):
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.with_ip_range('1.1.1.1/1')\
.build()
self.assertFalse(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
def test_should_return_false_if_rules_have_allowed_groups_with_different_names(self):
self.openstack_rule['ip_range'] = {}
self.openstack_rule['group'] = {'name': 'secGroup'}
self.ec2_connection.get_all_security_groups.return_value = [self.FakeSecurityGroup('secGroup')]
ec2_rule = FakeEC2RuleBuilder.an_ec2_rule()\
.with_allowed_security_group_id(5)\
.build()
self.assertFalse(self.rule_comparator.rules_are_equal(self.openstack_rule, ec2_rule))
if __name__ == '__main__':
unittest.main()