diff --git a/akanda/rug/api/rest.py b/akanda/rug/api/akanda_client.py similarity index 97% rename from akanda/rug/api/rest.py rename to akanda/rug/api/akanda_client.py index 14db2a7a..51c43a71 100644 --- a/akanda/rug/api/rest.py +++ b/akanda/rug/api/akanda_client.py @@ -40,6 +40,9 @@ def update_config(host, port, config_dict): if response.status_code != requests.codes.ok: raise Exception('Config update failed: %s' % response.text) + else: + return response.json + def read_labels(host, port): path = AKANDA_BASE_PATH + 'firewall/labels' diff --git a/akanda/rug/api/configuration.py b/akanda/rug/api/configuration.py index 606abf97..c056e73d 100644 --- a/akanda/rug/api/configuration.py +++ b/akanda/rug/api/configuration.py @@ -35,16 +35,15 @@ def generate(client, router, interfaces): def load_provider_rules(path): try: - return jsonutils.load(file(path)) - except: + return jsonutils.load(open(path)) + except: #pragma nocove LOG.exception('unable to open provider rules: %s' % path) def generate_network_config(client, router, interfaces): iface_map = dict([(i['lladdr'], i['ifname']) for i in interfaces]) - - retval= [ + retval = [ _network_config( client, router.external_port, @@ -72,12 +71,13 @@ def _management_network_config(port, ifname, interfaces): for iface in interfaces: if iface['ifname'] == ifname: interface = iface - return _network_config_dict(iface, MANAGEMENT_NET, port.network_id) + return _make_network_config_dict( + iface, MANAGEMENT_NET, port.network_id) def _network_config(client, port, ifname, network_type, network_ports=[]): subnets = client.get_network_subnets(port.network_id) - return _network_config_dict( + return _make_network_config_dict( _interface_config(ifname, port, subnets), network_type, port.network_id, @@ -85,10 +85,9 @@ def _network_config(client, port, ifname, network_type, network_ports=[]): network_ports=network_ports) -def _network_config_dict(interface, network_type, network_id, - v4_conf=SERVICE_STATIC, v6_conf=SERVICE_STATIC, - subnets=[], network_ports=[]): - +def _make_network_config_dict(interface, network_type, network_id, + v4_conf=SERVICE_STATIC, v6_conf=SERVICE_STATIC, + subnets=[], network_ports=[]): return {'interface': interface, 'network_id': network_id, 'v4_conf_service': v4_conf, @@ -106,7 +105,7 @@ def _interface_config(ifname, port, subnets): subnet_lookup[fixed.subnet_id].cidr.prefixlen) return {'ifname': ifname, - 'addresses': [fmt(fixed) for fixed in port.fixed_ips]} + 'addresses': [fmt(fixed) for fixed in port.fixed_ips]} def _subnet_config(subnet): @@ -114,7 +113,8 @@ def _subnet_config(subnet): 'cidr': str(subnet.cidr), 'dhcp_enabled': subnet.enable_dhcp, 'dns_nameservers': subnet.dns_nameservers, - 'host_routes': subnet.host_routes + 'host_routes': subnet.host_routes, + 'gateway_ip': str(subnet.gateway_ip) } @@ -149,13 +149,10 @@ def generate_anchor_config(client, provider_rules, router): def generate_tenant_port_forward_anchor(client, router): - to_ip = router.external_port.first_v4 + to_ip = router.external_port.first_v4 or INVALID_IP - if not to_ip: - rules = [_format_port_forward_rule(to_ip, pf) - for pf in client.get_portforwards(router.tenant_id)] - else: - rules = [] + rules = [_format_port_forward_rule(to_ip, pf) + for pf in client.get_portforwards(router.tenant_id)] return { 'name': 'tenant_v4_portforwards', @@ -164,7 +161,7 @@ def generate_tenant_port_forward_anchor(client, router): def _format_port_forward_rule(to_ip, pf): - redirect_ip = pf.port.first_v4 + redirect_ip = pf.port.first_v4 or INVALID_IP if not redirect_ip: return @@ -188,16 +185,12 @@ def generate_tenant_filter_rule_anchor(client, router): } -def _format_filter_rules(rule): +def _format_filter_rule(rule): return { 'action': rule.action, 'protocol': rule.protocol, 'source': rule.source.name if rule.source else None, - 'source_port': source_port, + 'source_port': rule.source_port, 'destination': rule.destination.name if rule.destination else None, - 'destination_port': destination_port, + 'destination_port': rule.destination_port, } - - -def generate_label_config(): - return dict([l.split('=', 1) for l in cfg.CONF.destination_labels]) diff --git a/akanda/rug/api/nova.py b/akanda/rug/api/nova.py index 53774d0e..af80dfd8 100644 --- a/akanda/rug/api/nova.py +++ b/akanda/rug/api/nova.py @@ -24,7 +24,7 @@ class Nova(object): flavor=self.conf.router_instance_flavor, nics=nics) - def get_instance_id(self, router): + def get_instance(self, router): instances = self.client.servers.list( search_opts=dict(name='ak-' + router.id)) @@ -34,22 +34,22 @@ class Nova(object): return None def get_router_instance_status(self, router): - instances = self.client.servers.list( - search_opts=dict(name='ak-' + router.id)) - - if instances: - return instances[0].status + instance = self.get_instance(router) + if instance: + return instance.status else: return None - def destory_router_instance(self, router): - instance_id = self.get_instance_id(router) - if instance_id: - self.client.servers.destroy(instance_id) + def destroy_router_instance(self, router): + instance = self.get_instance(router) + if instance: + self.client.servers.destroy(instance.id) def reboot_router_instance(self, router): - instance_id = self.get_instance_id(router) - if instance_id: - self.client.servers.reboot(instance_id) + instance = self.get_instance(router) + if instance: + if instance.status != 'REBOOT': + # no need to reboot twice + self.client.servers.reboot(instance.id) else: self.create_router_instance(router) diff --git a/akanda/rug/api/quantum.py b/akanda/rug/api/quantum.py index 419f438a..1dffc0ec 100644 --- a/akanda/rug/api/quantum.py +++ b/akanda/rug/api/quantum.py @@ -69,7 +69,7 @@ class Subnet(object): self.network_id = network_id self.ip_version = ip_version self.cidr = netaddr.IPNetwork(cidr) - self.gateway_ip = gateway_ip + self.gateway_ip = netaddr.IPAddress(gateway_ip) self.enable_dhcp = enable_dhcp self.dns_nameservers = dns_nameservers self.host_routes = host_routes @@ -157,7 +157,7 @@ class FilterRule(object): self.source = source self.source_port = source_port self.destination = destination - self.desination_port = destination_port + self.destination_port = destination_port @classmethod def from_dict(cls, d): @@ -183,7 +183,7 @@ class FilterRule(object): class PortForward(object): def __init__(self, id_, name, protocol, public_port, private_port, port): - self.id_ = id + self.id = id_ self.name = name self.protocol = protocol self.public_port = public_port @@ -201,7 +201,7 @@ class PortForward(object): Port.from_dict(d['port'])) -class AkandaClientWrapper(client.Client): +class AkandaExtClientWrapper(client.Client): """Add client support for Akanda Extensions. """ addressgroup_path = '/dhaddressgroup' addressentry_path = '/dhaddressentry' @@ -209,7 +209,6 @@ class AkandaClientWrapper(client.Client): portalias_path = '/dhportalias' portforward_path = '/dhportforward' - # portalias crud @client.APIParamsCall def list_portalias(self, **params): @@ -266,7 +265,7 @@ class AkandaClientWrapper(client.Client): class Quantum(object): def __init__(self, conf): self.conf = conf - self.client = AkandaClientWrapper( + self.client = AkandaExtClientWrapper( username=conf.admin_user, password=conf.admin_password, tenant_name=conf.admin_tenant_name, @@ -376,15 +375,3 @@ class Quantum(object): driver.init_l3(driver.get_device_name(port), [rug_ip]) return port - - -class DummyConf: - admin_user='demo' - admin_password='secret' - admin_tenant_name='demo' - auth_url='http://192.168.57.100:5000/v2.0/' - auth_strategy='keystone' - auth_region='RegionOne' - -q= Quantum(DummyConf) - diff --git a/akanda/rug/common/cache.py b/akanda/rug/common/cache.py index faa8f32f..fb437924 100644 --- a/akanda/rug/common/cache.py +++ b/akanda/rug/common/cache.py @@ -3,25 +3,20 @@ import weakref LOG = logging.getLogger(__name__) + class RouterCache(object): def __init__(self): self.cache = {} self.router_by_tenant = weakref.WeakValueDictionary() self.router_by_tenant_network = weakref.WeakValueDictionary() - self.router_by_tenant_subnet = weakref.WeakValueDictionary() - self.router_by_port = weakref.WeakValueDictionary() def put(self, router): self.cache[router.id] = router self.router_by_tenant[router.tenant_id] = router for port in router.internal_ports: - self.router_by_port[port.id] = router self.router_by_tenant_network[port.network_id] = router - for fixed in port.fixed_ips: - self.router_by_tenant_subnet[fixed.subnet_id] = router - def remove(self, router_id): try: del self.cache[router_id] diff --git a/akanda/rug/common/notification.py b/akanda/rug/common/notification.py index 27e62af5..c5f32c35 100644 --- a/akanda/rug/common/notification.py +++ b/akanda/rug/common/notification.py @@ -9,7 +9,6 @@ _HANDLER_ATTR = '_notification_handle_event_type' class NotificationMixin(object): - def create_notification_listener(self, topic, exchange_name=None): self._notification_handlers = {} for method in inspect.getmembers(self, inspect.ismethod): @@ -24,7 +23,6 @@ class NotificationMixin(object): exchange_name=exchange_name) self.notification_connection.consume_in_thread() - def _notification_mixin_dispatcher(self, msg): try: handlers = self._notification_handlers.get(msg['event_type'], []) @@ -34,7 +32,7 @@ class NotificationMixin(object): else: if hasattr(self, 'default_notification_handler'): self.default_notification_handler( - event_type, + msg['event_type'], msg['_context_tenant_id'], msg['payload']) diff --git a/akanda/rug/common/task.py b/akanda/rug/common/task.py index 949a378f..71f479eb 100644 --- a/akanda/rug/common/task.py +++ b/akanda/rug/common/task.py @@ -6,14 +6,14 @@ LOG = logging.getLogger(__name__) class Task(object): - def __init__(self, method, data, max_attempts=9): + def __init__(self, method, data, max_attempts=3): self.method = method self.data = data self.current = 0 self.max_attempts = max_attempts def __call__(self): - self.current +=1 + self.current += 1 self.method(self.data) def should_retry(self): @@ -53,6 +53,8 @@ class TaskManager(object): if task.should_retry(): self.delay_queue.put(task) + else: + LOG.error('Task Error: %s' % task) def _requeue_failed(self): while True: diff --git a/akanda/rug/manager.py b/akanda/rug/manager.py index 45ca8e56..5cc72f1a 100644 --- a/akanda/rug/manager.py +++ b/akanda/rug/manager.py @@ -10,7 +10,7 @@ from akanda.rug.common import task from akanda.rug.api import configuration from akanda.rug.api import nova from akanda.rug.api import quantum -from akanda.rug.api import rest +from akanda.rug.api import akanda_client as router_api from akanda.rug.openstack.common import cfg from akanda.rug.openstack.common import context from akanda.rug.openstack.common import periodic_task @@ -44,14 +44,14 @@ OPTIONS = [ # listen for Quantum notification events cfg.StrOpt('notification_topic', - default='notifications.info', - help='Quantum notification topic name'), + default='notifications.info', + help='Quantum notification topic name'), cfg.StrOpt('quantum_control_exchange', - default='openstack', - help='Quantum control exchange name'), + default='openstack', + help='Quantum control exchange name'), cfg.StrOpt('control_exchange', - default='akanda', - help='Akanda control exchange name') + default='akanda', + help='Akanda control exchange name') ] cfg.CONF.register_opts(OPTIONS) @@ -95,8 +95,8 @@ class AkandaL3Manager(notification.NotificationMixin, @periodic_task.periodic_task(ticks_between_runs=15) def refresh_configs(self): LOG.debug('resync configuration state') - for rtr in self.cache.keys(): - self.task_mgr.put(self.update_config, rtr.id) + for rtr_id in self.cache.keys(): + self.task_mgr.put(self.update_config, rtr_id) # notification handlers def default_notifcation_handler(self, event_type, tenant_id, payload): @@ -117,7 +117,6 @@ class AkandaL3Manager(notification.NotificationMixin, if rtr: self.task_mgr.put(self.update_router, rtr.id) - @notification.handles('router.create.end') def handle_router_create_notification(self, tenant_id, payload): self.task_mgr.put(self._spawn_router, payload['router']['id']) @@ -140,7 +139,7 @@ class AkandaL3Manager(notification.NotificationMixin, self.task_mgr.put(self.update_router, rtr.id) for rtr_id in (known_routers - active_routers): - self.task_mgr.put(self.destory_router, rtr.id) + self.task_mgr.put(self.destroy_router, rtr.id) def update_router(self, router_id): LOG.debug('Updating router: %s' % router_id) @@ -159,7 +158,7 @@ class AkandaL3Manager(notification.NotificationMixin, LOG.debug('Destroying router: %s' % router_id) rtr = self.cache.get(router_id) if rtr: - self.nova.destory_router_instance(rtr) + self.nova.destroy_router_instance(rtr) self.cache.remove(rtr) def reboot_router(self, router): @@ -174,26 +173,28 @@ class AkandaL3Manager(notification.NotificationMixin, def update_config(self, router): LOG.debug('Updating router %s config' % router.id) - interfaces = rest.get_interfaces(_get_management_address(router), - cfg.CONF.akanda_mgt_service_port) + interfaces = router_api.get_interfaces( + _get_management_address(router), + cfg.CONF.akanda_mgt_service_port) config = configuration.generate(self.quantum, router, interfaces) import pprint pprint.pprint(config) - rest.update_config(_get_management_address(router), - cfg.CONF.akanda_mgt_service_port, - config) + router_api.update_config(_get_management_address(router), + cfg.CONF.akanda_mgt_service_port, + config) LOG.debug('Router %s config updated.' % router.id) def router_is_alive(self, router): - return rest.is_alive(_get_management_address(router), - cfg.CONF.akanda_mgt_service_port) + return router_api.is_alive(_get_management_address(router), + cfg.CONF.akanda_mgt_service_port) def verify_router_interfaces(self, router): try: - interfaces = rest.get_interfaces(_get_management_address(router), - cfg.CONF.akanda_mgt_service_port) + interfaces = router_api.get_interfaces( + _get_management_address(router), + cfg.CONF.akanda_mgt_service_port) router_macs = set([iface['lladdr'] for iface in interfaces]) @@ -207,8 +208,10 @@ class AkandaL3Manager(notification.NotificationMixin, def report_bandwidth(self, router): try: - bandwidth = rest.read_labels(_get_management_address(router), - cfg.CONF.akanda_mgt_service_port) + bandwidth = router_api.read_labels( + _get_management_address(router), + cfg.CONF.akanda_mgt_service_port) + if bandwidth: message = { 'tenant_id': router.tenant_id, @@ -239,6 +242,7 @@ class AkandaL3Manager(notification.NotificationMixin, return router + def _get_management_address(router): prefix, prefix_len = cfg.CONF.management_prefix.split('/', 1) eui = netaddr.EUI(router.management_port.mac_address) diff --git a/akanda/rug/service.py b/akanda/rug/service.py index 388c1852..e6623752 100644 --- a/akanda/rug/service.py +++ b/akanda/rug/service.py @@ -9,7 +9,7 @@ from akanda.rug.openstack.common import service cfg.CONF.register_opts([ cfg.IntOpt('periodic_interval', - default=10, + default=60, help='seconds between running periodic tasks (ie health check)') ]) diff --git a/etc/provider_rules.json b/etc/provider_rules.json new file mode 100644 index 00000000..8f99b7c3 --- /dev/null +++ b/etc/provider_rules.json @@ -0,0 +1,4 @@ +{"labels": {"ext": ["192.168.57.0/24"]}, + "prerules": [], + "postrules": [] +} diff --git a/etc/rug.ini b/etc/rug.ini new file mode 100644 index 00000000..593df329 --- /dev/null +++ b/etc/rug.ini @@ -0,0 +1,27 @@ +[DEFAULT] +admin_user=quantum +admin_password=password +admin_tenant_name=service +auth_url=http://192.168.57.100:35357/v2.0/ +auth_strategy=keystone +auth_region=RegionOne + +management_prefix=fdca:3ba5:a17a:acda::/64 +akanda_mgt_service_port=5000 + +management_network_id=7860030c-fc43-45d1-88d1-5863a04e9ebf +management_subnet_id=25f3586d-fa0a-434a-8001-d4849ab514fd +external_network_id=3a9a71ce-8bea-4b6f-82ca-d74ee672895c + +router_image_uuid=27334edd-49b8-4a2d-8dae-2f37af5126c7 +router_instance_flavor=1 + +# to plug in rug interface +root_helper=sudo +interface_driver=quantum.agent.linux.interface.OVSInterfaceDriver +ovs_integration_bridge=br-int + +rabbit_password = yetanothersecret +rabbit_host = 192.168.57.100 + +provider_rules_path=/opt/stack/akanda-rug/akanda/rug/provider_rules.json diff --git a/setup.py b/setup.py index 07580b30..ec844195 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,8 @@ setup( license='BSD', install_requires=[ 'netaddr>=0.7.7', - 'requests>=0.14.1' + 'requests>=0.14.1', + 'python-quantumclient>=2.1' ], namespace_packages=['akanda'], packages=find_packages(), diff --git a/test/unit/api/__init__.py b/test/unit/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/unit/api/test_akanda_router.py b/test/unit/api/test_akanda_router.py new file mode 100644 index 00000000..95f5973a --- /dev/null +++ b/test/unit/api/test_akanda_router.py @@ -0,0 +1,103 @@ +import mock +import unittest2 as unittest + +from akanda.rug.api import akanda_client + + +class TestAkandaClient(unittest.TestCase): + def test_mgt_url(self): + self.assertEqual('http://[fe80::2]:5000/', + akanda_client._mgt_url('fe80::2', 5000, '/')) + self.assertEqual('http://192.168.1.1:5000/', + akanda_client._mgt_url('192.168.1.1', 5000, '/')) + + def test_is_alive_success(self): + with mock.patch('requests.get') as request_get: + response = mock.Mock() + response.status_code = 200 + request_get.return_value = response + + self.assertTrue(akanda_client.is_alive('fe80::2', 5000)) + request_get.assert_called_once_with( + 'http://[fe80::2]:5000/v1/firewall/labels', + timeout=1.0) + + def test_is_alive_bad_status(self): + with mock.patch('requests.get') as request_get: + response = mock.Mock() + response.status_code = 500 + request_get.return_value = response + + self.assertFalse(akanda_client.is_alive('fe80::2', 5000)) + request_get.assert_called_once_with( + 'http://[fe80::2]:5000/v1/firewall/labels', + timeout=1.0) + + def test_is_alive_exception(self): + with mock.patch('requests.get') as request_get: + request_get.side_effect = Exception + + self.assertFalse(akanda_client.is_alive('fe80::2', 5000)) + request_get.assert_called_once_with( + 'http://[fe80::2]:5000/v1/firewall/labels', + timeout=1.0) + + def test_get_interfaces(self): + with mock.patch('requests.get') as request_get: + response = mock.Mock() + response.status_code = 200 + response.json = {'interfaces': 'the_interfaces'} + request_get.return_value = response + + self.assertEqual(akanda_client.get_interfaces('fe80::2', 5000), + 'the_interfaces') + request_get.assert_called_once_with( + 'http://[fe80::2]:5000/v1/system/interfaces') + + def test_update_config(self): + config = {'foo': 'bar'} + + with mock.patch('requests.put') as request_put: + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json = config + request_put.return_value = mock_response + + resp = akanda_client.update_config('fe80::2', 5000, config) + + request_put.assert_called_once_with( + 'http://[fe80::2]:5000/v1/system/config', + data='{"foo": "bar"}', + headers={'Content-type': 'application/json'}) + self.assertEqual(resp, config) + + def test_update_config_failure(self): + config = {'foo': 'bar'} + + with mock.patch('requests.put') as request_put: + mock_response = mock.Mock() + mock_response.status_code = 500 + mock_response.text = 'error text' + request_put.return_value = mock_response + + with self.assertRaises(Exception): + akanda_client.update_config('fe80::2', 5000, config) + + request_put.assert_called_once_with( + 'http://[fe80::2]:5000/v1/system/config', + data='{"foo": "bar"}', + headers={'Content-type': 'application/json'}) + + def test_read_labels(self): + with mock.patch('requests.post') as request_post: + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json = {'labels': ['label1', 'label2']} + request_post.return_value = mock_response + + resp = akanda_client.read_labels('fe80::2', 5000) + + request_post.assert_called_once_with( + 'http://[fe80::2]:5000/v1/firewall/labels') + + self.assertEqual(resp, ['label1', 'label2']) diff --git a/test/unit/api/test_configuration.py b/test/unit/api/test_configuration.py new file mode 100644 index 00000000..6bf86572 --- /dev/null +++ b/test/unit/api/test_configuration.py @@ -0,0 +1,352 @@ +import mock +import netaddr +import unittest2 as unittest + +from akanda.rug.api import configuration as conf_mod +from akanda.rug.openstack.common import cfg + + +class FakeModel(object): + def __init__(self, id_, **kwargs): + self.id = id_ + self.__dict__.update(kwargs) + +fake_ext_port = FakeModel( + '1', + mac_address='aa:bb:cc:dd:ee:ff', + network_id='ext-net', + fixed_ips=[FakeModel('', ip_address='9.9.9.9', subnet_id='s2')], + first_v4='9.9.9.9') + + +fake_mgt_port = FakeModel( + '2', + mac_address='aa:bb:cc:cc:bb:aa', + network_id='mgt-net') + +fake_int_port = FakeModel( + '3', + mac_address='aa:aa:aa:aa:aa:aa', + network_id='int-net', + fixed_ips=[FakeModel('', ip_address='192.168.1.1', subnet_id='s1')]) + +fake_vm_port = FakeModel( + '4', + mac_address='aa:aa:aa:aa:aa:bb', + network_id='int-net', + fixed_ips=[FakeModel('', ip_address='192.168.1.2', subnet_id='s1')], + first_v4='192.168.1.2') + +fake_subnet = FakeModel( + 's1', + cidr=netaddr.IPNetwork('192.168.1.0/24'), + gateway_ip='192.168.1.1', + enable_dhcp=True, + dns_nameservers=['8.8.8.8'], + host_routes={}) + +fake_router = FakeModel( + 'router_id', + tenant_id='tenant_id', + external_port=fake_ext_port, + management_port=fake_mgt_port, + internal_ports=[fake_int_port]) + + +class TestAkandaClient(unittest.TestCase): + def setUp(self): + cfg.CONF.set_override('provider_rules_path', '/the/path') + + def tearDown(self): + cfg.CONF.reset() + + def test_generate(self): + methods = { + 'load_provider_rules': mock.DEFAULT, + 'generate_network_config': mock.DEFAULT, + 'generate_address_book_config': mock.DEFAULT, + 'generate_anchor_config': mock.DEFAULT + } + + mock_client = mock.Mock() + ifaces = [] + provider_rules = {'labels': {'ext': ['192.168.1.1']}} + + with mock.patch.multiple(conf_mod, **methods) as mocks: + mocks['load_provider_rules'].return_value = provider_rules + mocks['generate_network_config'].return_value = 'network_config' + mocks['generate_address_book_config'].return_value = 'ab_config' + mocks['generate_anchor_config'].return_value = 'anchor_config' + + config = conf_mod.generate(mock_client, fake_router, ifaces) + + expected = { + 'networks': 'network_config', + 'address_book': 'ab_config', + 'anchors': 'anchor_config', + 'labels': {'ext': ['192.168.1.1']}} + + self.assertEqual(config, expected) + + mocks['load_provider_rules'].assert_called_once_with('/the/path') + mocks['generate_network_config'].assert_called_once_with( + mock_client, fake_router, ifaces) + mocks['generate_address_book_config'].assert_called_once_with( + mock_client, fake_router) + mocks['generate_anchor_config'].assert_called_once_with( + mock_client, provider_rules, fake_router) + + def test_load_provider_rules(self): + rules_dict = {'labels': {}, 'preanchors': [], 'postanchors': []} + with mock.patch('akanda.rug.openstack.common.jsonutils.load') as load: + load.return_value = rules_dict + with mock.patch('__builtin__.open') as mock_open: + r = conf_mod.load_provider_rules('/the/path') + + mock_open.assert_called_once_with('/the/path') + load.assert_called_once_with(mock_open.return_value) + self.assertEqual(r, rules_dict) + + def test_generate_network_config(self): + methods = { + '_network_config': mock.DEFAULT, + '_management_network_config': mock.DEFAULT, + } + + mock_client = mock.Mock() + + ifaces = [ + {'ifname': 'ge0', 'lladdr': fake_mgt_port.mac_address}, + {'ifname': 'ge1', 'lladdr': fake_ext_port.mac_address}, + {'ifname': 'ge2', 'lladdr': fake_int_port.mac_address} + ] + + with mock.patch.multiple(conf_mod, **methods) as mocks: + mocks['_network_config'].return_value = 'configured_network' + mocks['_management_network_config'].return_value = 'mgt_net' + + result = conf_mod.generate_network_config( + mock_client, fake_router, ifaces) + + expected = [ + 'configured_network', + 'mgt_net', + 'configured_network' + ] + + self.assertEqual(result, expected) + + mocks['_network_config'].assert_has_calls([ + mock.call( + mock_client, + fake_router.external_port, + 'ge1', + 'external'), + mock.call( + mock_client, + fake_int_port, + 'ge2', + 'internal', + mock.ANY)]) + + mocks['_management_network_config'].assert_called_once_with( + fake_router.management_port, 'ge0', ifaces) + + def test_managment_network_config(self): + with mock.patch.object(conf_mod, '_make_network_config_dict') as nc: + interface = { + 'ifname': 'ge0', + } + + ifaces = [interface] + + conf_mod._management_network_config(fake_mgt_port, 'ge0', ifaces) + nc.assert_called_once_with(interface, 'management', 'mgt-net') + + def test_network_config(self): + subnets = [fake_subnet] + mock_client = mock.Mock() + mock_client.get_network_subnets.return_value = subnets + + with mock.patch.object(conf_mod, '_make_network_config_dict') as nc: + with mock.patch.object(conf_mod, '_interface_config') as ic: + mock_interface = mock.Mock() + ic.return_value = mock_interface + + conf_mod._network_config( + mock_client, + fake_int_port, + 'ge1', + 'internal', + []) + + ic.assert_called_once_with('ge1', fake_int_port, subnets) + nc.assert_called_once_with( + mock_interface, + 'internal', + 'int-net', + subnets=subnets, + network_ports=[]) + + def test_make_network_config(self): + interface = {'ifname': 'ge2'} + + result = conf_mod._make_network_config_dict( + interface, + 'internal', + fake_int_port.network_id, + 'dhcp', + 'ra', + subnets=[fake_subnet], + network_ports=[fake_vm_port]) + + expected = { + 'interface': interface, + 'network_id': fake_int_port.network_id, + 'v4_conf_service': 'dhcp', + 'v6_conf_service': 'ra', + 'network_type': 'internal', + 'subnets': [{'cidr': '192.168.1.0/24', + 'dhcp_enabled': True, + 'dns_nameservers': ['8.8.8.8'], + 'gateway_ip': '192.168.1.1', + 'host_routes': {}}], + 'allocations': [('aa:aa:aa:aa:aa:bb', + '192-168-1-2.local', + '192.168.1.2')] + } + + self.assertEqual(result, expected) + + def test_interface_config(self): + expected = {'addresses': ['192.168.1.1/24'], 'ifname': 'ge1'} + + self.assertEqual( + expected, + conf_mod._interface_config('ge1', fake_int_port, [fake_subnet])) + + def test_subnet_config(self): + expected = { + 'cidr': '192.168.1.0/24', + 'dhcp_enabled': True, + 'dns_nameservers': ['8.8.8.8'], + 'gateway_ip': '192.168.1.1', + 'host_routes': {} + } + + self.assertEqual(conf_mod._subnet_config(fake_subnet), expected) + + def test_allocation_config(self): + expected = [('aa:aa:aa:aa:aa:bb', '192-168-1-2.local', '192.168.1.2')] + self.assertEqual(conf_mod._allocation_config([fake_vm_port]), expected) + + def test_generate_address_book_config(self): + fake_address_group = FakeModel( + 'g1', + name='local_net', + entries=[netaddr.IPNetwork('10.0.0.0/8')]) + + mock_client = mock.Mock() + mock_client.get_addressgroups.return_value = [fake_address_group] + + result = conf_mod.generate_address_book_config(mock_client, + fake_router) + + expected = {'local_net': ['10.0.0.0/8']} + self.assertEqual(result, expected) + + def test_generate_anchor_config(self): + mock_client = mock.Mock() + provider_rules = { + 'preanchors': ['pre'], + 'postanchors': ['post'] + } + + methods = { + 'generate_tenant_port_forward_anchor': mock.DEFAULT, + 'generate_tenant_filter_rule_anchor': mock.DEFAULT + } + + with mock.patch.multiple(conf_mod, **methods) as mocks: + mocks['generate_tenant_port_forward_anchor'].return_value = 'fwd' + mocks['generate_tenant_filter_rule_anchor'].return_value = 'filter' + + result = conf_mod.generate_anchor_config( + mock_client, provider_rules, fake_router) + + expected = ['pre', 'fwd', 'filter', 'post'] + self.assertEqual(result, expected) + + def test_generate_port_forward_anchor(self): + port_forward = FakeModel( + 'pf1', + protocol='tcp', + public_port=8080, + private_port=80, + port=fake_vm_port) + + mock_client = mock.Mock() + mock_client.get_portforwards.return_value = [port_forward] + + result = conf_mod.generate_tenant_port_forward_anchor( + mock_client, fake_router) + + expected = { + 'name': 'tenant_v4_portforwards', + 'rules': [ + {'action': 'pass', + 'family': 'inet', + 'protocol': 'tcp', + 'redirect': '192.168.1.2', + 'redirect_port': 80, + 'to': '9.9.9.9/32', + 'to_port': 8080} + ] + } + + self.assertEqual(result, expected) + + def test_generate_filter_rule_anchor(self): + dest_rule = FakeModel( + 'fr1', + action='pass', + protocol='tcp', + source=None, + source_port=None, + destination=FakeModel('d1', name='webservers'), + destination_port=80) + + source_rule = FakeModel( + 'fr1', + action='pass', + protocol='tcp', + source=FakeModel('s1', name='home'), + source_port=None, + destination=None, + destination_port=22) + + mock_client = mock.Mock() + mock_client.get_filterrules.return_value = [dest_rule, source_rule] + + result = conf_mod.generate_tenant_filter_rule_anchor( + mock_client, fake_router) + + expected = { + 'name': 'tenant_filterrules', + 'rules': [{'action': 'pass', + 'destination': 'webservers', + 'destination_port': 80, + 'protocol': 'tcp', + 'source': None, + 'source_port': None}, + {'action': 'pass', + 'destination': None, + 'destination_port': 22, + 'protocol': 'tcp', + 'source': 'home', + 'source_port': None} + ] + } + + self.assertEqual(result, expected) + diff --git a/test/unit/api/test_nova_wrapper.py b/test/unit/api/test_nova_wrapper.py new file mode 100644 index 00000000..812db183 --- /dev/null +++ b/test/unit/api/test_nova_wrapper.py @@ -0,0 +1,165 @@ +import mock +import unittest2 as unittest + +from akanda.rug.api import nova + +class FakeModel(object): + def __init__(self, id_, **kwargs): + self.id = id_ + self.__dict__.update(kwargs) + +fake_ext_port = FakeModel( + '1', + mac_address='aa:bb:cc:dd:ee:ff', + network_id='ext-net', + fixed_ips=[FakeModel('', ip_address='9.9.9.9', subnet_id='s2')]) + +fake_mgt_port = FakeModel( + '2', + mac_address='aa:bb:cc:cc:bb:aa', + network_id='mgt-net') + +fake_int_port = FakeModel( + '3', + mac_address='aa:aa:aa:aa:aa:aa', + network_id='int-net', + fixed_ips=[FakeModel('', ip_address='192.168.1.1', subnet_id='s1')]) + +fake_router = FakeModel( + 'router_id', + tenant_id='tenant_id', + external_port=fake_ext_port, + management_port=fake_mgt_port, + internal_ports=[fake_int_port]) + + +class FakeConf: + admin_user='admin' + admin_password='password' + admin_tenant_name='admin' + auth_url='http://127.0.0.1/' + auth_strategy='keystone' + auth_region='RegionOne' + router_image_uuid='akanda-image' + router_instance_flavor=1 + + +class TestNovaWrapper(unittest.TestCase): + def setUp(self): + self.addCleanup(mock.patch.stopall) + patch = mock.patch('novaclient.v1_1.client.Client') + self.client = mock.Mock() + self.client_cls = patch.start() + self.client_cls.return_value = self.client + self.nova = nova.Nova(FakeConf) + + + def test_create_router_instance(self): + expected = [ + mock.call.servers.create( + 'ak-router_id', + nics=[{'port-id': '2', + 'net-id': 'mgt-net', + 'v4-fixed-ip': ''}, + {'port-id': '1', + 'net-id': 'ext-net', + 'v4-fixed-ip': ''}, + {'port-id': '3', + 'net-id': 'int-net', + 'v4-fixed-ip': ''}], + flavor=1, + image='akanda-image' + ) + ] + + self.nova.create_router_instance(fake_router) + self.client.assert_has_calls(expected) + + + def test_get_instance(self): + instance = mock.Mock() + self.client.servers.list.return_value = [instance] + + expected = [ + mock.call.servers.list(search_opts={'name': 'ak-router_id'}) + ] + + result = self.nova.get_instance(fake_router) + self.client.assert_has_calls(expected) + self.assertEqual(result, instance) + + def test_get_instance_not_found(self): + self.client.servers.list.return_value = [] + + expected = [ + mock.call.servers.list(search_opts={'name': 'ak-router_id'}) + ] + + result = self.nova.get_instance(fake_router) + self.client.assert_has_calls(expected) + self.assertIsNone(result) + + def test_get_router_instance_status(self): + instance = mock.Mock() + instance.status = 'ACTIVE' + self.client.servers.list.return_value = [instance] + + expected = [ + mock.call.servers.list(search_opts={'name': 'ak-router_id'}) + ] + + result = self.nova.get_router_instance_status(fake_router) + self.client.assert_has_calls(expected) + self.assertEqual(result, 'ACTIVE') + + def test_get_router_instance_status_not_found(self): + self.client.servers.list.return_value = [] + + expected = [ + mock.call.servers.list(search_opts={'name': 'ak-router_id'}) + ] + + result = self.nova.get_router_instance_status(fake_router) + self.client.assert_has_calls(expected) + self.assertIsNone(result) + + + def test_destory_router_instance(self): + with mock.patch.object(self.nova, 'get_instance') as get_instance: + get_instance.return_value.id='instance_id' + + expected = [ + mock.call.servers.destroy('instance_id') + ] + + self.nova.destroy_router_instance(fake_router) + self.client.assert_has_calls(expected) + + def test_reboot_router_instance_exists(self): + with mock.patch.object(self.nova, 'get_instance') as get_instance: + get_instance.return_value.id='instance_id' + get_instance.return_value.status='ACTIVE' + + expected = [ + mock.call.servers.reboot('instance_id') + ] + + self.nova.reboot_router_instance(fake_router) + self.client.assert_has_calls(expected) + + def test_reboot_router_instance_rebooting(self): + with mock.patch.object(self.nova, 'get_instance') as get_instance: + get_instance.return_value.id='instance_id' + get_instance.return_value.status='REBOOT' + + self.nova.reboot_router_instance(fake_router) + self.assertEqual(self.client.mock_calls, []) + + def test_reboot_router_instance_missing(self): + with mock.patch.object(self.nova, 'get_instance') as get_instance: + with mock.patch.object(self.nova, 'create_router_instance') as cr: + get_instance.return_value = None + + self.nova.reboot_router_instance(fake_router) + self.assertEqual(self.client.mock_calls, []) + cr.assert_called_once_with(fake_router) diff --git a/test/unit/api/test_quantum_wrapper.py b/test/unit/api/test_quantum_wrapper.py new file mode 100644 index 00000000..97eb4093 --- /dev/null +++ b/test/unit/api/test_quantum_wrapper.py @@ -0,0 +1,188 @@ +import mock +import netaddr +import unittest2 as unittest + +from akanda.rug.api import quantum + + +class TestQuantumModels(unittest.TestCase): + def test_router(self): + r = quantum.Router( + '1', 'tenant_id', 'name', True, 'ext', ['int'], 'mgt') + self.assertEqual(r.id, '1') + self.assertEqual(r.tenant_id, 'tenant_id') + self.assertEqual(r.name, 'name') + self.assertTrue(r.admin_state_up) + self.assertEqual(r.external_port, 'ext') + self.assertEqual(r.management_port, 'mgt') + self.assertEqual(r.internal_ports, ['int']) + + + def test_router_from_dict(self): + d = { + 'id': '1', + 'tenant_id': 'tenant_id', + 'name': 'name', + 'admin_state_up': True, + 'external_port': 'ext', + 'ports' : [] + } + + r = quantum.Router.from_dict(d) + + self.assertEqual(r.id, '1') + self.assertEqual(r.tenant_id, 'tenant_id') + self.assertEqual(r.name, 'name') + self.assertTrue(r.admin_state_up) + self.assertEqual(r.external_port, 'ext') + + def test_router_eq(self): + r1 = quantum.Router( + '1', 'tenant_id', 'name', True, 'ext', ['int'], 'mgt') + r2 = quantum.Router( + '1', 'tenant_id', 'name', True, 'ext', ['int'], 'mgt') + + self.assertEqual(r1, r2) + + def test_router_ne(self): + r1 = quantum.Router( + '1', 'tenant_id', 'name', True, 'ext', ['int'], 'mgt') + r2 = quantum.Router( + '2', 'tenant_id', 'name', True, 'ext', ['int'], 'mgt') + + self.assertNotEqual(r1, r2) + + + def test_subnet_model(self): + d = { + 'id': '1', + 'tenant_id': 'tenant_id', + 'name': 'name', + 'network_id': 'network_id', + 'ip_version': 6, + 'cidr': 'fe80::/64', + 'gateway_ip': 'fe80::1', + 'enable_dhcp': True, + 'dns_nameservers': ['8.8.8.8', '8.8.4.4'], + 'host_routes': [] + } + + s = quantum.Subnet.from_dict(d) + + self.assertEqual(s.id, '1') + self.assertEqual(s.tenant_id, 'tenant_id') + self.assertEqual(s.name, 'name') + self.assertEqual(s.network_id, 'network_id') + self.assertEqual(s.ip_version, 6) + self.assertEqual(s.cidr, netaddr.IPNetwork('fe80::/64')) + self.assertEqual(s.gateway_ip, netaddr.IPAddress('fe80::1')) + self.assertTrue(s.enable_dhcp, True) + self.assertEqual(s.dns_nameservers, ['8.8.8.8', '8.8.4.4']) + self.assertEqual(s.host_routes, []) + + def test_port_model(self): + d = { + 'id': '1', + 'device_id': 'device_id', + 'fixed_ips': [{'ip_address': '192.168.1.1', 'subnet_id': 'sub1'}], + 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'network_id': 'net_id', + 'device_owner': 'test' + } + + p = quantum.Port.from_dict(d) + + self.assertEqual(p.id, '1') + self.assertEqual(p.device_id, 'device_id') + self.assertEqual(p.mac_address, 'aa:bb:cc:dd:ee:ff') + self.assertEqual(p.device_owner, 'test') + self.assertEqual(len(p.fixed_ips), 1) + + + def test_fixed_ip_model(self): + d = { + 'subnet_id': 'sub1', + 'ip_address': '192.168.1.1' + } + + fip = quantum.FixedIp.from_dict(d) + + self.assertEqual(fip.subnet_id, 'sub1') + self.assertEqual(fip.ip_address, netaddr.IPAddress('192.168.1.1')) + + def test_addressgroup_model(self): + d = { + 'id': '1', + 'name': 'group1', + 'entries': [{'cidr': '192.168.1.1/24'}] + } + + g = quantum.AddressGroup.from_dict(d) + + self.assertEqual(g.id, '1') + self.assertEqual(g.name, 'group1') + self.assertEqual(g.entries, [netaddr.IPNetwork('192.168.1.1/24')]) + + + def test_filterrule_model(self): + d = { + 'id': '1', + 'action': 'pass', + 'protocol': 'tcp', + 'source': {'id': '1', + 'name': 'group', + 'entries': [{'cidr': '192.168.1.1/24'}]}, + 'source_port': None, + 'destination': None, + 'destination_port': 80 + } + + r = quantum.FilterRule.from_dict(d) + + self.assertEqual(r.id, '1') + self.assertEqual(r.action, 'pass') + self.assertEqual(r.protocol, 'tcp') + self.assertEqual(r.source.name, 'group') + self.assertIsNone(r.source_port) + self.assertIsNone(r.destination) + self.assertEqual(r.destination_port, 80) + + def test_portforward_model(self): + p = { + 'id': '1', + 'device_id': 'device_id', + 'fixed_ips': [{'ip_address': '192.168.1.1', 'subnet_id': 'sub1'}], + 'mac_address': 'aa:bb:cc:dd:ee:ff', + 'network_id': 'net_id', + 'device_owner': 'test' + } + + d = { + 'id': '1', + 'name': 'name', + 'protocol': 'tcp', + 'public_port': 8022, + 'private_port': 22, + 'port': p + } + + fw = quantum.PortForward.from_dict(d) + + self.assertEqual(fw.id, '1') + self.assertEqual(fw.name, 'name') + self.assertEqual(fw.protocol, 'tcp') + self.assertEqual(fw.public_port, 8022) + self.assertEqual(fw.private_port, 22) + self.assertEqual(fw.port.device_id, 'device_id') + +class FakeConf: + admin_user='admin' + admin_password='password' + admin_tenant_name='admin' + auth_url='http://127.0.0.1/' + auth_strategy='keystone' + auth_region='RegionOne' + + +class TestAkandaClientWrapper(unittest.TestCase): + pass diff --git a/test/unit/common/__init__.py b/test/unit/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/unit/common/test_cache.py b/test/unit/common/test_cache.py new file mode 100644 index 00000000..311f354c --- /dev/null +++ b/test/unit/common/test_cache.py @@ -0,0 +1,98 @@ +import unittest2 as unittest + +from akanda.rug.common import cache + + +class FakeModel: + def __init__(self, id_, **kwargs): + self.id = id_ + self.__dict__.update(kwargs) + + def __str__(self): + return str(self.__dict__) + + +class TestCache(unittest.TestCase): + def test_init(self): + c = cache.RouterCache() + + def test_put(self): + fake_router = FakeModel( + 'the_id', + tenant_id='tenant_id', + internal_ports=[FakeModel('port_id', network_id='net1')]) + + c = cache.RouterCache() + c.put(fake_router) + + self.assertEqual(c.cache, {'the_id': fake_router}) + self.assertEqual(c.router_by_tenant.keys(), ['tenant_id']) + self.assertEqual(c.router_by_tenant_network.keys(), ['net1']) + + def test_remove(self): + fake_router = FakeModel( + 'the_id', + tenant_id='tenant_id', + internal_ports=[FakeModel('port_id', network_id='net1')]) + + c = cache.RouterCache() + c.put(fake_router) + c.remove(fake_router.id) + del fake_router + + self.assertEqual(len(c.cache), 0) + self.assertEqual(len(c.router_by_tenant), 0) + self.assertEqual(len(c.router_by_tenant_network), 0) + + def test_remove_nonexistent(self): + c = cache.RouterCache() + c.remove('bad_id') + self.assertEqual(len(c.cache), 0) + self.assertEqual(len(c.router_by_tenant), 0) + self.assertEqual(len(c.router_by_tenant_network), 0) + + def test_get(self): + fake_router = FakeModel( + 'the_id', + tenant_id='tenant_id', + internal_ports=[FakeModel('port_id', network_id='net1')]) + + c = cache.RouterCache() + c.put(fake_router) + + self.assertEqual(c.get('the_id'), fake_router) + self.assertEqual(c.get_by_tenant_id('tenant_id'), fake_router) + + def test_keys(self): + fake_router1 = FakeModel( + 'the_id', + tenant_id='tenant_id', + internal_ports=[FakeModel('port_id', network_id='net1')]) + + fake_router2 = FakeModel( + 'the_2nd', + tenant_id='tenant_id2', + internal_ports=[FakeModel('port_id', network_id='net2')]) + + c = cache.RouterCache() + c.put(fake_router1) + c.put(fake_router2) + + self.assertItemsEqual(c.keys(), ['the_id', 'the_2nd']) + + def test_routers(self): + fake_router1 = FakeModel( + 'the_id', + tenant_id='tenant_id', + internal_ports=[FakeModel('port_id', network_id='net1')]) + + fake_router2 = FakeModel( + 'the_2nd', + tenant_id='tenant_id2', + internal_ports=[FakeModel('port_id', network_id='net2')]) + + c = cache.RouterCache() + c.put(fake_router1) + c.put(fake_router2) + + self.assertItemsEqual(c.routers(), [fake_router1, fake_router2]) diff --git a/test/unit/common/test_notification.py b/test/unit/common/test_notification.py new file mode 100644 index 00000000..9f140252 --- /dev/null +++ b/test/unit/common/test_notification.py @@ -0,0 +1,89 @@ +import mock +import unittest2 as unittest + +from akanda.rug.common import notification + + +class NotificationTest(notification.NotificationMixin): + @notification.handles('foo') + def handle_foo(self, tenant_id, payload): + pass + + @notification.handles('bar') + @notification.handles('baz') + def handle_multi(self, tenant_id, payload): + pass + + def default_notification_handler(self): + pass + + +class TestNotificationMixin(unittest.TestCase): + def test_init(self): + n = NotificationTest() + + def test_create_listener(self): + with mock.patch.object(notification, 'rpc') as rpc: + n = NotificationTest() + n.create_notification_listener('the_topic', 'the_exch') + + expected = [ + mock.call.create_connection(new=True), + mock.call.create_connection().declare_topic_consumer( + topic='the_topic', + callback=n._notification_mixin_dispatcher, + exchange_name='the_exch'), + mock.call.create_connection().consume_in_thread()] + + rpc.assert_has_calls(expected) + self.assertEqual(n._notification_handlers, + {'foo': [n.handle_foo], + 'bar': [n.handle_multi], + 'baz': [n.handle_multi]}) + + def test_dispatcher_known_event_type(self): + test_message = { + 'event_type': 'foo', + '_context_tenant_id': 'tenant_id', + 'payload': 'the_payload' + } + + mock_handler = mock.Mock() + + n = NotificationTest() + n._notification_handlers = {'foo': [mock_handler]} + + n._notification_mixin_dispatcher(test_message) + mock_handler.assert_called_once_with('tenant_id', 'the_payload') + + def test_dispatcher_unknown_event_type(self): + test_message = { + 'event_type': 'mystery', + '_context_tenant_id': 'tenant_id', + 'payload': 'the_payload' + } + + n = NotificationTest() + n._notification_handlers = {} + + with mock.patch.object(n, 'default_notification_handler') as dh: + n._notification_mixin_dispatcher(test_message) + dh.assert_called_once_with('mystery', 'tenant_id', 'the_payload') + + def test_exception_during_dispatcher(self): + test_message = { + 'event_type': 'foo', + '_context_tenant_id': 'tenant_id', + 'payload': 'the_payload' + } + + mock_handler = mock.Mock() + mock_handler.side_effect = Exception + + n = NotificationTest() + n._notification_handlers = {'foo': [mock_handler]} + + with mock.patch.object(notification, 'LOG') as log: + n._notification_mixin_dispatcher(test_message) + mock_handler.assert_called_once_with('tenant_id', 'the_payload') + log.assert_has_calls([mock.call.exception(mock.ANY)]) diff --git a/test/unit/common/test_task.py b/test/unit/common/test_task.py new file mode 100644 index 00000000..91acc06b --- /dev/null +++ b/test/unit/common/test_task.py @@ -0,0 +1,128 @@ +import mock +import unittest2 as unittest + +from akanda.rug.common import task + + +class TestTask(unittest.TestCase): + def test_init(self): + t = task.Task('the_method', 'data') + self.assertEqual(t.method, 'the_method') + self.assertEqual(t.data, 'data') + self.assertEqual(t.current, 0) + self.assertEqual(t.max_attempts, 3) + + def test_call(self): + method = mock.Mock() + + t = task.Task(method, 'data', 3) + t() + + self.assertEqual(t.current, 1) + method.assert_called_once_with('data') + + def test_should_retry(self): + method = mock.Mock() + + t = task.Task(method, 'data', 1) + self.assertTrue(t.should_retry()) + t() + self.assertFalse(t.should_retry()) + method.assert_called_once_with('data') + + def test_repr(self): + method = mock.Mock() + method.__name__ = 'method' + + t = task.Task(method, 'data', 1) + + self.assertEqual( + repr(t), + '') + + +class TestTaskManager(unittest.TestCase): + def test_init(self): + tm = task.TaskManager(10) + + def test_put(self): + tm = task.TaskManager(10) + tm.put('method', 'data') + + self.assertEqual(tm.task_queue.qsize(), 1) + qt = tm.task_queue.get() + + self.assertEqual(qt.method, 'method') + self.assertEqual(qt.data, 'data') + self.assertEqual(qt.max_attempts, 3) + + def test_start(self): + with mock.patch('eventlet.spawn') as spawn: + tm = task.TaskManager(10) + tm.start() + + spawn.assert_has_calls([ + mock.call.spawn(tm._serialized_task_runner), + mock.call.spawn(tm._requeue_failed)]) + + def test_task_runner(self): + tm = task.TaskManager(10) + + with mock.patch.object(tm, 'task_queue') as q: + with mock.patch.object(task.LOG, 'info') as info: + info.side_effect = [None, IOError] + try: + tm._serialized_task_runner() + except IOError: + pass + q.assert_has_calls([mock.call.get(), mock.call.get()()]) + + def test_task_runner_exception_during_task(self): + tm = task.TaskManager(10) + + mock_task = mock.Mock() + mock_task.should_retry.return_value = True + mock_task.side_effect = Exception + + with mock.patch.object(tm, 'task_queue') as q: + q.get.return_value = mock_task + with mock.patch.object(task.LOG, 'info') as info: + info.side_effect = [None, IOError] + try: + tm._serialized_task_runner() + except IOError: + pass + q.assert_has_calls([mock.call.get(), mock.call.get()()]) + self.assertEqual(tm.delay_queue.qsize(), 1) + + def test_task_runner_exception_during_task_out_of_retries(self): + tm = task.TaskManager(10) + + mock_task = mock.Mock() + mock_task.should_retry.return_value = False + mock_task.side_effect = Exception + + with mock.patch.object(tm, 'task_queue') as q: + q.get.return_value = mock_task + with mock.patch.object(task.LOG, 'info') as info: + info.side_effect = [None, IOError] + with mock.patch.object(task.LOG, 'error') as error: + try: + tm._serialized_task_runner() + except IOError: + pass + q.assert_has_calls([mock.call.get(), mock.call.get()()]) + self.assertEqual(tm.delay_queue.qsize(), 0) + self.assertEqual(len(error.mock_calls), 2) + + def test_requeue_failed(self): + tm = task.TaskManager(10) + with mock.patch('eventlet.sleep') as sleep: + sleep.side_effect = [None, IOError] + tm.delay_queue.put(mock.Mock()) + try: + tm._requeue_failed() + except IOError: + pass + self.assertEqual(tm.task_queue.qsize(), 1) + self.assertEqual(tm.delay_queue.qsize(), 0)