diff --git a/contrib/devstack/lib/cue b/contrib/devstack/lib/cue index 20c6d0e8..84355df4 100644 --- a/contrib/devstack/lib/cue +++ b/contrib/devstack/lib/cue @@ -99,8 +99,8 @@ function configure_cue { fi # Set cluster node check timeouts - iniset $CUE_CONF taskflow cluster_node_check_timeout 10 - iniset $CUE_CONF taskflow cluster_node_check_max_count 30 + iniset $CUE_CONF taskflow cluster_node_check_timeout 30 + iniset $CUE_CONF taskflow cluster_node_check_max_count 120 # Set flow create cluster node vm active retry count iniset $CUE_CONF flow_options create_cluster_node_vm_active_retry_count $CUE_TF_CREATE_CLUSTER_NODE_VM_ACTIVE_RETRY_COUNT @@ -207,6 +207,7 @@ function init_cue { $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 5672:5672 $CUE_RABBIT_SECURITY_GROUP $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 4369:4369 $CUE_RABBIT_SECURITY_GROUP $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 61000:61000 $CUE_RABBIT_SECURITY_GROUP + $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 15672:15672 $CUE_RABBIT_SECURITY_GROUP fi CUE_RABBIT_SECURITY_GROUP_ID=$($OPENSTACK_CMD security group list | grep $CUE_RABBIT_SECURITY_GROUP | tr -d ' ' | cut -f 2 -d '|') diff --git a/cue/taskflow/flow/create_cluster.py b/cue/taskflow/flow/create_cluster.py index bce8132e..05e07454 100644 --- a/cue/taskflow/flow/create_cluster.py +++ b/cue/taskflow/flow/create_cluster.py @@ -15,10 +15,13 @@ import oslo_config.cfg as cfg import taskflow.patterns.graph_flow as graph_flow +import taskflow.patterns.linear_flow as linear_flow +import taskflow.retry as retry from cue.db.sqlalchemy import models from cue.taskflow.flow import create_cluster_node import cue.taskflow.task as cue_tasks +import os_tasklib.common as os_common def create_cluster(cluster_id, node_ids, user_network_id, @@ -60,6 +63,25 @@ def create_cluster(cluster_id, node_ids, user_network_id, node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count + check_rabbit_online = linear_flow.Flow( + name="wait for RabbitMQ ready state", + retry=retry.Times(node_check_max_count, revert_all=True)) + check_rabbit_online.add( + cue_tasks.GetRabbitClusterStatus( + name="get RabbitMQ status", + rebind={'vm_ip': "vm_management_ip_0"}, + provides="clustering_status", + inject={'proto': 'http'}), + os_common.CheckFor( + name="check cluster status", + rebind={'check_var': "clustering_status"}, + check_value='OK', + retry_delay_seconds=10), + ) + flow.add(check_rabbit_online) + + flow.link(check_rabbit_online, end_task) + #todo(dagnello): verify node_ids is a list and not a string for i, node_id in enumerate(node_ids): generate_userdata = cue_tasks.ClusterNodeUserData( @@ -71,7 +93,8 @@ def create_cluster(cluster_id, node_ids, user_network_id, create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow, generate_userdata, start_task, - end_task, node_check_timeout, + check_rabbit_online, + node_check_timeout, node_check_max_count, user_network_id, management_network_id) diff --git a/cue/taskflow/flow/create_cluster_node.py b/cue/taskflow/flow/create_cluster_node.py index 17cbaef1..faf311d4 100644 --- a/cue/taskflow/flow/create_cluster_node.py +++ b/cue/taskflow/flow/create_cluster_node.py @@ -36,7 +36,7 @@ CONF.register_opts(FLOW_OPTS, group='flow_options') def create_cluster_node(cluster_id, node_number, node_id, graph_flow, - generate_userdata, start_task, end_task, + generate_userdata, start_task, post_task, node_check_timeout, node_check_max_count, user_network_id, management_network_id): """Create Cluster Node factory function @@ -53,8 +53,8 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, :type graph_flow: taskflow.patterns.graph_flow :param start_task: Update cluster status start task :type start_task: cue.taskflow.task.UpdateClusterStatus - :param end_task: Update cluster status end task - :type end_task: cue.taskflow.task.UpdateClusterStatus + :param post_task: Task/Subflow to run after the flow created here + :type post_task: taskflow task or flow :param generate_userdata: generate user data task :type generate_userdata: cue.taskflow.task.ClusterNodeUserData :param node_check_timeout: seconds wait between node status checks @@ -158,18 +158,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, graph_flow.add(check_vm_active) graph_flow.link(get_vm_id, check_vm_active) - #todo(dagnello): make retry times configurable - check_rabbit_online = linear_flow.Flow( - name="wait for RabbitMQ ready state %s" % node_name, - retry=retry.Times(node_check_max_count, revert_all=True)) - check_rabbit_online.add( - os_common.VerifyNetwork( - name="get RabbitMQ status %s" % node_name, - rebind={'vm_ip': "vm_management_ip_%d" % node_number}, - retry_delay_seconds=node_check_timeout)) - graph_flow.add(check_rabbit_online) - graph_flow.link(check_vm_active, check_rabbit_online) - build_node_info = os_common.Lambda( new_node_values, name="build new node values %s" % node_name, @@ -194,13 +182,13 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, provides="endpoint_values_%d" % node_number ) graph_flow.add(build_endpoint_info) - graph_flow.link(check_rabbit_online, build_endpoint_info) + graph_flow.link(check_vm_active, build_endpoint_info) create_endpoint = cue_tasks.CreateEndpoint( name="update endpoint for node %s" % node_name, rebind={'endpoint_values': "endpoint_values_%d" % node_number}) graph_flow.add(create_endpoint) - graph_flow.link(check_rabbit_online, create_endpoint) + graph_flow.link(check_vm_active, create_endpoint) - graph_flow.link(update_node, end_task) - graph_flow.link(create_endpoint, end_task) + graph_flow.link(update_node, post_task) + graph_flow.link(create_endpoint, post_task) diff --git a/cue/taskflow/task/__init__.py b/cue/taskflow/task/__init__.py index 619dddc0..ca575438 100644 --- a/cue/taskflow/task/__init__.py +++ b/cue/taskflow/task/__init__.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -from check_or_restart_rabbitmq import CheckOrRestartRabbitMq # noqa from cluster_node_userdata import ClusterNodeUserData # noqa from create_endpoint_task import CreateEndpoint # noqa from get_node import GetNode # noqa +from get_rabbit_cluster_status import GetRabbitClusterStatus # noqa from update_cluster_task import UpdateClusterStatus # noqa from update_endpoints_task import UpdateEndpoints # noqa from update_node_task import UpdateNode # noqa diff --git a/cue/taskflow/task/check_or_restart_rabbitmq.py b/cue/taskflow/task/check_or_restart_rabbitmq.py deleted file mode 100644 index 08c51ef4..00000000 --- a/cue/taskflow/task/check_or_restart_rabbitmq.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import telnetlib as telnet -import time - -import six -import taskflow.task - - -class CheckOrRestartRabbitMq(taskflow.task.Task): - """Check or Restart RabbitMQ VM - - This task either checks that RabbitMQ is running or restarts the VM, - depending on the supplied action. - - """ - - def __init__(self, - os_client, - retry_delay_seconds=None, - retry_delay_ms=None, - name=None, - **kwargs): - """Constructor - - This constructor sets the retry delay for this task's revert method. - - :param retry_delay_seconds: retry delay in seconds - :param retry_delay_ms: retry delay in milliseconds - :param name: unique name for atom - """ - super(CheckOrRestartRabbitMq, self).__init__(name=name, **kwargs) - - self.os_client = os_client - self.sleep_time = 0 - if retry_delay_seconds: - self.sleep_time = retry_delay_seconds - - if retry_delay_ms: - self.sleep_time += retry_delay_ms / 1000.0 - - def execute(self, action, vm_info, vm_ip, port, **kwargs): - """main execute method - - :param action: The request context in dict format. - :type action: oslo_context.RequestContext - :param vm_info: Unique ID for the node. - :type vm_info: dict or string - :param vm_ip: - :type vm_ip: - :param port: - :type port: - """ - if six.PY2 and isinstance(port, unicode): - check_port = port.encode() - else: - check_port = port - - if isinstance(vm_info, dict): - vm_id = vm_info['id'] - else: - vm_id = vm_info - - if action == 'restart': - self.os_client.servers.reboot(vm_id) - - tn = telnet.Telnet() - tn.open(vm_ip, check_port, timeout=10) - - def revert(self, *args, **kwargs): - """Revert CreateVmTask - - This method is executed upon failure of the GetRabbitVmStatus or the - Flow that the Task is part of. - - :param args: positional arguments that the task required to execute. - :type args: list - :param kwargs: keyword arguments that the task required to execute; the - special key `result` will contain the :meth:`execute` - results (if any) and the special key `flow_failures` - will contain any failure information. - """ - if self.sleep_time != 0: - time.sleep(self.sleep_time) diff --git a/cue/taskflow/task/get_rabbit_cluster_status.py b/cue/taskflow/task/get_rabbit_cluster_status.py new file mode 100644 index 00000000..19042918 --- /dev/null +++ b/cue/taskflow/task/get_rabbit_cluster_status.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import urllib +import urllib2 + +import taskflow.task + + +class GetRabbitClusterStatus(taskflow.task.Task): + def execute(self, + vm_ip, + default_rabbit_user, + default_rabbit_pass, + proto=None, + **kwargs): + """Main execute method to verify Rabbitmq clustering in a VM + + :param vm_ip: VM ip address + :type vm_ip: string + :param default_rabbit_user: Username for RabbitMQ + :type default_rabbit_user: string + :param default_rabbit_pass: Password for RabbitMQ + :type default_rabbit_pass: string + :param proto: Protocol for url, http/https + :type proto: string + """ + if not proto: + proto = 'http' + + base_url = proto + '://' + vm_ip + ':15672/api' + + # RMQ management url to get a list of the existing vhosts + vhosts_url = base_url + '/vhosts' + # RMQ management url to query whether the RMQ nodes have clustered + aliveness_url = base_url + '/aliveness-test' + + password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + password_mgr.add_password(None, + vhosts_url, + default_rabbit_user, + default_rabbit_pass) + handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(handler) + + retval = "NOT-OK" + + try: + res = opener.open(vhosts_url) + except urllib2.URLError: + pass + else: + json_res = json.load(res) + + result = {} + for x in json_res: + vhost = urllib.quote(x['name'], '') + cur_aliveness_url = aliveness_url + '/' + vhost + + password_mgr.add_password(None, + cur_aliveness_url, + default_rabbit_user, + default_rabbit_pass) + handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(handler) + res = opener.open(cur_aliveness_url) + + result[x['name']] = json.load(res)['status'] + if all(val == 'ok' for val in result.values()): + retval = 'OK' + + return retval \ No newline at end of file diff --git a/cue/templates/install_rabbit.sh.tmpl b/cue/templates/install_rabbit.sh.tmpl index be6a67cf..59bd66fa 100644 --- a/cue/templates/install_rabbit.sh.tmpl +++ b/cue/templates/install_rabbit.sh.tmpl @@ -51,6 +51,8 @@ rabbitmq hard nofile 65536 rabbitmq soft nofile 65536 EOF +rabbitmq-plugins enable rabbitmq_management + update-rc.d rabbitmq-server enable while [ ! -z "$(service rabbitmq-server status | grep 'nodedown')" ]; do diff --git a/cue/tests/functional/fixtures/urllib2_fixture.py b/cue/tests/functional/fixtures/urllib2_fixture.py new file mode 100644 index 00000000..6fd3a118 --- /dev/null +++ b/cue/tests/functional/fixtures/urllib2_fixture.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import StringIO +import urllib2 + +import cue.tests.functional.fixtures.base as base + + +class Urllib2ResultDetails(object): + urllib2_result_list = [] + + @staticmethod + def set_urllib2_result(results): + """Helper function to setup sequence of fixed results. + + :param results: list of results + """ + for result in results: + io = StringIO.StringIO(result) + Urllib2ResultDetails.urllib2_result_list.append(io) + + @staticmethod + def get_urllib2_result(): + """Returns the next result in configured sequence. + + If result sequence is empty, an empty string result of '' is returned. + + :return: Current urllib result in urllib2_result_list. + """ + if len(Urllib2ResultDetails.urllib2_result_list) == 0: + result = '' + else: + result = Urllib2ResultDetails.urllib2_result_list.pop() + return result + + +class Urllib2Fixture(base.BaseFixture): + """A test fixture to simulate urllib2 calls + + This class is used in test cases to simulate urllib2 calls. + """ + def __init__(self, *args, **kwargs): + super(Urllib2Fixture, self).__init__(*args, **kwargs) + + def setUp(self): + """Set up test fixture and apply all method overrides.""" + super(Urllib2Fixture, self).setUp() + + urllib2_client = self.mock('urllib2.OpenerDirector') + urllib2_client.open = self.open + + def open(self, url): + result = Urllib2ResultDetails.get_urllib2_result() + if result.getvalue() is 'URLError': + raise urllib2.URLError('urlerror') + return result \ No newline at end of file diff --git a/cue/tests/functional/taskflow/flow/test_create_cluster.py b/cue/tests/functional/taskflow/flow/test_create_cluster.py index 0e286721..dd6cff1d 100644 --- a/cue/tests/functional/taskflow/flow/test_create_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_create_cluster.py @@ -22,7 +22,7 @@ from cue.taskflow.flow import create_cluster from cue.tests.functional import base from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet +from cue.tests.functional.fixtures import urllib2_fixture from taskflow import engines import taskflow.exceptions as taskflow_exc @@ -32,7 +32,7 @@ class CreateClusterTests(base.FunctionalTestCase): additional_fixtures = [ nova.NovaClient, neutron.NeutronClient, - telnet.TelnetClient + urllib2_fixture.Urllib2Fixture ] def setUp(self): @@ -66,6 +66,19 @@ class CreateClusterTests(base.FunctionalTestCase): name=management_network_name) self.management_network = network_list['networks'][0] + # Todo(Dan) If testing becomes asynchronous, then there is no guarantee + # that these urllib return results will come in the proper order. Will + # have to update the urllib2 fixture to respond appropriately for the + # url passed in. + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]'] + ) + def test_create_cluster(self): flow_store = { "image": self.valid_image.id, diff --git a/cue/tests/functional/taskflow/flow/test_delete_cluster.py b/cue/tests/functional/taskflow/flow/test_delete_cluster.py index f79ebf4d..ed0f7288 100644 --- a/cue/tests/functional/taskflow/flow/test_delete_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_delete_cluster.py @@ -24,7 +24,7 @@ from cue.taskflow.flow import delete_cluster from cue.tests.functional import base from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet +from cue.tests.functional.fixtures import urllib2_fixture from taskflow import engines import taskflow.exceptions as taskflow_exc @@ -34,7 +34,7 @@ class DeleteClusterTests(base.FunctionalTestCase): additional_fixtures = [ nova.NovaClient, neutron.NeutronClient, - telnet.TelnetClient + urllib2_fixture.Urllib2Fixture ] def setUp(self): @@ -68,6 +68,19 @@ class DeleteClusterTests(base.FunctionalTestCase): name=management_network_name) self.management_network = network_list['networks'][0] + # Todo(Dan) If testing becomes asynchronous, then there is no guarantee + # that these urllib return results will come in the proper order. Will + # have to update the urllib2 fixture to respond appropriately for the + # url passed in. + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]'] + ) + def test_delete_cluster(self): flow_store_create = { "image": self.valid_image.id, diff --git a/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py b/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py deleted file mode 100644 index 50610b7b..00000000 --- a/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py +++ /dev/null @@ -1,110 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import cue.client as client -import cue.taskflow.task as cue_task -from cue.tests.functional import base -from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet - -from taskflow import engines -from taskflow.patterns import linear_flow -import taskflow.retry as retry - -import uuid - - -class CheckOrRestartRabbitTest(base.FunctionalTestCase): - additional_fixtures = [ - nova.NovaClient, - telnet.TelnetClient - ] - - task_store = { - 'vm_ip': "0.0.0.0", - 'vm_info': str(uuid.uuid4()), - 'port': '5672' - } - - def setUp(self): - super(CheckOrRestartRabbitTest, self).setUp() - - self.nova_client = client.nova_client() - - rabbitmq_retry_strategy = [ - 'check', - 'check', - 'check', - 'check', - 'restart', - ] - - retry_controller = retry.ForEach(rabbitmq_retry_strategy, - "retry check RabbitMQ", - provides="retry_strategy") - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry_controller).add( - cue_task.CheckOrRestartRabbitMq( - os_client=self.nova_client, - name="get RabbitMQ status %s", - rebind={'action': "retry_strategy"}, - retry_delay_seconds=1)) - - def test_get_rabbit_status(self): - """Verifies GetRabbitVmStatus task directly.""" - - # start engine to run task - engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store) - - def test_get_vm_status_flow(self): - """Verifies GetRabbitVmStatus in a successful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until telnet_status acquired from get_rabbit_vm_status - task returns 'connect'. Attempting the telnet connection will return - 'wait' for the first three times, then will return 'connected' once a - telnet connection is made. The node status should be in Active state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['connected', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - # start engine to run task - engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store) - self.assertFalse(self.nova_client.servers.reboot.called) - - def test_get_vm_status_flow_fail(self): - """Verifies GetRabbitVmStatus in an unsuccessful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until the retry count has been exhausted. Attempting - the telnet connection will return 'wait' until retry count reaches - limit and flow fails. The node status should remain in Building state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['wait', - 'wait', - 'wait', - 'wait', - 'wait', - 'wait']) - - # start engine to run task - self.assertRaises(IOError, engines.run, self.flow, - store=CheckOrRestartRabbitTest.task_store) - self.assertTrue(self.nova_client.servers.reboot.called) diff --git a/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py b/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py new file mode 100644 index 00000000..e3a46a53 --- /dev/null +++ b/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cue.taskflow.task import get_rabbit_cluster_status +from cue.tests.functional import base +from cue.tests.functional.fixtures import urllib2_fixture as urllib2_fixture + +from taskflow import engines +from taskflow.patterns import linear_flow + +import urllib2 + + +class GetRabbitClusterStatusTest(base.FunctionalTestCase): + additional_fixtures = [ + urllib2_fixture.Urllib2Fixture + ] + + task_store = { + "vm_ip": "0.0.0.0", + "default_rabbit_user": "user", + "default_rabbit_pass": "pass" + } + + def setUp(self): + super(GetRabbitClusterStatusTest, self).setUp() + + def test_get_rabbit_cluster_status(self): + """Verifies GetRabbitClusterStatus task.""" + + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '{"status": "clustering"}', + '[{"name": "/"}]'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + + def test_get_rabbit_cluster_status_fail(self): + """Verifies unsuccessful path. + + This test simulates when the RMQ vm is not responding to the + urllib2.open calls on the RMQ management port. + """ + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{}', + '[]'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + + def test_get_rabbit_cluster_status_connection_failed(self): + """Verifies GetRabbitClusterStatus task.""" + + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['URLError'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + try: + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + except urllib2.URLError: + # Expected + pass \ No newline at end of file diff --git a/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py b/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py deleted file mode 100644 index d5357ab9..00000000 --- a/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from cue.tests.functional import base -from cue.tests.functional.fixtures import telnet -import os_tasklib.common.verify_network_task as verify_network_task - -from taskflow import engines -from taskflow.patterns import linear_flow -import taskflow.retry as retry - - -class GetRabbitVmStatusTest(base.FunctionalTestCase): - additional_fixtures = [ - telnet.TelnetClient - ] - - task_store = { - 'vm_ip': "0.0.0.0", - 'port': '5672' - } - - def setUp(self): - super(GetRabbitVmStatusTest, self).setUp() - - def test_get_rabbit_status(self): - """Verifies GetRabbitVmStatus task directly.""" - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(5)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - engines.run(self.flow, store=GetRabbitVmStatusTest.task_store) - - def test_get_vm_status_flow(self): - """Verifies GetRabbitVmStatus in a successful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until telnet_status acquired from get_rabbit_vm_status - task returns 'connect'. Attempting the telnet connection will return - 'wait' for the first three times, then will return 'connected' once a - telnet connection is made. The node status should be in Active state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['connected', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(5)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - engines.run(self.flow, store=GetRabbitVmStatusTest.task_store) - - def test_get_vm_status_flow_fail(self): - """Verifies GetRabbitVmStatus in an unsuccessful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until the retry count has been exhausted. Attempting - the telnet connection will return 'wait' until retry count reaches - limit and flow fails. The node status should remain in Building state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['wait', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(3)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - self.assertRaises(IOError, engines.run, self.flow, - store=GetRabbitVmStatusTest.task_store) \ No newline at end of file diff --git a/os_tasklib/common/__init__.py b/os_tasklib/common/__init__.py index 6bf4d11b..da298c66 100644 --- a/os_tasklib/common/__init__.py +++ b/os_tasklib/common/__init__.py @@ -18,4 +18,3 @@ from check_for import CheckFor # noqa from lambda_task import Lambda # noqa from map_task import Map # noqa from reduce_task import Reduce # noqa -from verify_network_task import VerifyNetwork # noqa diff --git a/os_tasklib/common/verify_network_task.py b/os_tasklib/common/verify_network_task.py deleted file mode 100644 index 394861fa..00000000 --- a/os_tasklib/common/verify_network_task.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import telnetlib as telnet -import time - -import six -import taskflow.task - - -class VerifyNetwork(taskflow.task.Task): - def __init__(self, - retry_delay_seconds=None, - retry_delay_ms=None, - name=None, - **kwargs): - """Constructor - - This constructor sets the retry delay for this task's revert method. - - :param retry_delay_seconds: retry delay in seconds - :param retry_delay_ms: retry delay in milliseconds - :param name: unique name for atom - """ - super(VerifyNetwork, self).__init__(name=name, **kwargs) - - self.sleep_time = 0 - if retry_delay_seconds: - self.sleep_time = retry_delay_seconds - - if retry_delay_ms: - self.sleep_time += retry_delay_ms / 1000.0 - - def execute(self, vm_ip, port, **kwargs): - """Main execute method to verify network connection in a VM - - :param vm_ip: VM ip address - :type vm_ip: string - :param port: host service port - :type port: int - """ - if six.PY2 and isinstance(port, unicode): - check_port = port.encode() - else: - check_port = port - tn = telnet.Telnet() - tn.open(vm_ip, check_port, timeout=10) - - def revert(self, *args, **kwargs): - """Revert CreateVmTask - - This method is executed upon failure of the GetRabbitVmStatus or the - Flow that the Task is part of. - - :param args: positional arguments that the task required to execute. - :type args: list - :param kwargs: keyword arguments that the task required to execute; the - special key `result` will contain the :meth:`execute` - results (if any) and the special key `flow_failures` - will contain any failure information. - """ - if self.sleep_time != 0: - time.sleep(self.sleep_time)