From fe9f3767fb3511eaad5d6f6961b6c4640c02b87f Mon Sep 17 00:00:00 2001 From: Daniel Allegood Date: Mon, 27 Jul 2015 14:18:11 -0700 Subject: [PATCH] Check RabbitMQ cluster status via REST API during cluster create This is replacing the TaskFlow task to check when a rabbit node begins to respond to port 5672. We now use rabbits management plugin and REST API to check when the cluster is active and able to consume jobs. The new task is added at the end of the create_cluster flow. This is (arbitrarily) going to the 0th node's RabbitMQ Management port. Change-Id: I9dffdefd9cdcd6317f5c3114c59b4b30b8218239 --- contrib/devstack/lib/cue | 5 +- cue/taskflow/flow/create_cluster.py | 25 +++- cue/taskflow/flow/create_cluster_node.py | 26 ++--- cue/taskflow/task/__init__.py | 2 +- .../task/check_or_restart_rabbitmq.py | 97 --------------- .../task/get_rabbit_cluster_status.py | 85 ++++++++++++++ cue/templates/install_rabbit.sh.tmpl | 2 + .../functional/fixtures/urllib2_fixture.py | 69 +++++++++++ .../taskflow/flow/test_create_cluster.py | 17 ++- .../taskflow/flow/test_delete_cluster.py | 17 ++- .../task/test_check_or_restart_rabbitmq.py | 110 ------------------ .../task/test_get_rabbit_cluster_status.py | 90 ++++++++++++++ .../task/test_get_rabbit_vm_status.py | 99 ---------------- os_tasklib/common/__init__.py | 1 - os_tasklib/common/verify_network_task.py | 75 ------------ 15 files changed, 311 insertions(+), 409 deletions(-) delete mode 100644 cue/taskflow/task/check_or_restart_rabbitmq.py create mode 100644 cue/taskflow/task/get_rabbit_cluster_status.py create mode 100644 cue/tests/functional/fixtures/urllib2_fixture.py delete mode 100644 cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py create mode 100644 cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py delete mode 100644 cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py delete mode 100644 os_tasklib/common/verify_network_task.py diff --git a/contrib/devstack/lib/cue b/contrib/devstack/lib/cue index 20c6d0e8..84355df4 100644 --- a/contrib/devstack/lib/cue +++ b/contrib/devstack/lib/cue @@ -99,8 +99,8 @@ function configure_cue { fi # Set cluster node check timeouts - iniset $CUE_CONF taskflow cluster_node_check_timeout 10 - iniset $CUE_CONF taskflow cluster_node_check_max_count 30 + iniset $CUE_CONF taskflow cluster_node_check_timeout 30 + iniset $CUE_CONF taskflow cluster_node_check_max_count 120 # Set flow create cluster node vm active retry count iniset $CUE_CONF flow_options create_cluster_node_vm_active_retry_count $CUE_TF_CREATE_CLUSTER_NODE_VM_ACTIVE_RETRY_COUNT @@ -207,6 +207,7 @@ function init_cue { $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 5672:5672 $CUE_RABBIT_SECURITY_GROUP $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 4369:4369 $CUE_RABBIT_SECURITY_GROUP $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 61000:61000 $CUE_RABBIT_SECURITY_GROUP + $OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 15672:15672 $CUE_RABBIT_SECURITY_GROUP fi CUE_RABBIT_SECURITY_GROUP_ID=$($OPENSTACK_CMD security group list | grep $CUE_RABBIT_SECURITY_GROUP | tr -d ' ' | cut -f 2 -d '|') diff --git a/cue/taskflow/flow/create_cluster.py b/cue/taskflow/flow/create_cluster.py index bce8132e..05e07454 100644 --- a/cue/taskflow/flow/create_cluster.py +++ b/cue/taskflow/flow/create_cluster.py @@ -15,10 +15,13 @@ import oslo_config.cfg as cfg import taskflow.patterns.graph_flow as graph_flow +import taskflow.patterns.linear_flow as linear_flow +import taskflow.retry as retry from cue.db.sqlalchemy import models from cue.taskflow.flow import create_cluster_node import cue.taskflow.task as cue_tasks +import os_tasklib.common as os_common def create_cluster(cluster_id, node_ids, user_network_id, @@ -60,6 +63,25 @@ def create_cluster(cluster_id, node_ids, user_network_id, node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count + check_rabbit_online = linear_flow.Flow( + name="wait for RabbitMQ ready state", + retry=retry.Times(node_check_max_count, revert_all=True)) + check_rabbit_online.add( + cue_tasks.GetRabbitClusterStatus( + name="get RabbitMQ status", + rebind={'vm_ip': "vm_management_ip_0"}, + provides="clustering_status", + inject={'proto': 'http'}), + os_common.CheckFor( + name="check cluster status", + rebind={'check_var': "clustering_status"}, + check_value='OK', + retry_delay_seconds=10), + ) + flow.add(check_rabbit_online) + + flow.link(check_rabbit_online, end_task) + #todo(dagnello): verify node_ids is a list and not a string for i, node_id in enumerate(node_ids): generate_userdata = cue_tasks.ClusterNodeUserData( @@ -71,7 +93,8 @@ def create_cluster(cluster_id, node_ids, user_network_id, create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow, generate_userdata, start_task, - end_task, node_check_timeout, + check_rabbit_online, + node_check_timeout, node_check_max_count, user_network_id, management_network_id) diff --git a/cue/taskflow/flow/create_cluster_node.py b/cue/taskflow/flow/create_cluster_node.py index 17cbaef1..faf311d4 100644 --- a/cue/taskflow/flow/create_cluster_node.py +++ b/cue/taskflow/flow/create_cluster_node.py @@ -36,7 +36,7 @@ CONF.register_opts(FLOW_OPTS, group='flow_options') def create_cluster_node(cluster_id, node_number, node_id, graph_flow, - generate_userdata, start_task, end_task, + generate_userdata, start_task, post_task, node_check_timeout, node_check_max_count, user_network_id, management_network_id): """Create Cluster Node factory function @@ -53,8 +53,8 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, :type graph_flow: taskflow.patterns.graph_flow :param start_task: Update cluster status start task :type start_task: cue.taskflow.task.UpdateClusterStatus - :param end_task: Update cluster status end task - :type end_task: cue.taskflow.task.UpdateClusterStatus + :param post_task: Task/Subflow to run after the flow created here + :type post_task: taskflow task or flow :param generate_userdata: generate user data task :type generate_userdata: cue.taskflow.task.ClusterNodeUserData :param node_check_timeout: seconds wait between node status checks @@ -158,18 +158,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, graph_flow.add(check_vm_active) graph_flow.link(get_vm_id, check_vm_active) - #todo(dagnello): make retry times configurable - check_rabbit_online = linear_flow.Flow( - name="wait for RabbitMQ ready state %s" % node_name, - retry=retry.Times(node_check_max_count, revert_all=True)) - check_rabbit_online.add( - os_common.VerifyNetwork( - name="get RabbitMQ status %s" % node_name, - rebind={'vm_ip': "vm_management_ip_%d" % node_number}, - retry_delay_seconds=node_check_timeout)) - graph_flow.add(check_rabbit_online) - graph_flow.link(check_vm_active, check_rabbit_online) - build_node_info = os_common.Lambda( new_node_values, name="build new node values %s" % node_name, @@ -194,13 +182,13 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, provides="endpoint_values_%d" % node_number ) graph_flow.add(build_endpoint_info) - graph_flow.link(check_rabbit_online, build_endpoint_info) + graph_flow.link(check_vm_active, build_endpoint_info) create_endpoint = cue_tasks.CreateEndpoint( name="update endpoint for node %s" % node_name, rebind={'endpoint_values': "endpoint_values_%d" % node_number}) graph_flow.add(create_endpoint) - graph_flow.link(check_rabbit_online, create_endpoint) + graph_flow.link(check_vm_active, create_endpoint) - graph_flow.link(update_node, end_task) - graph_flow.link(create_endpoint, end_task) + graph_flow.link(update_node, post_task) + graph_flow.link(create_endpoint, post_task) diff --git a/cue/taskflow/task/__init__.py b/cue/taskflow/task/__init__.py index 619dddc0..ca575438 100644 --- a/cue/taskflow/task/__init__.py +++ b/cue/taskflow/task/__init__.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -from check_or_restart_rabbitmq import CheckOrRestartRabbitMq # noqa from cluster_node_userdata import ClusterNodeUserData # noqa from create_endpoint_task import CreateEndpoint # noqa from get_node import GetNode # noqa +from get_rabbit_cluster_status import GetRabbitClusterStatus # noqa from update_cluster_task import UpdateClusterStatus # noqa from update_endpoints_task import UpdateEndpoints # noqa from update_node_task import UpdateNode # noqa diff --git a/cue/taskflow/task/check_or_restart_rabbitmq.py b/cue/taskflow/task/check_or_restart_rabbitmq.py deleted file mode 100644 index 08c51ef4..00000000 --- a/cue/taskflow/task/check_or_restart_rabbitmq.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import telnetlib as telnet -import time - -import six -import taskflow.task - - -class CheckOrRestartRabbitMq(taskflow.task.Task): - """Check or Restart RabbitMQ VM - - This task either checks that RabbitMQ is running or restarts the VM, - depending on the supplied action. - - """ - - def __init__(self, - os_client, - retry_delay_seconds=None, - retry_delay_ms=None, - name=None, - **kwargs): - """Constructor - - This constructor sets the retry delay for this task's revert method. - - :param retry_delay_seconds: retry delay in seconds - :param retry_delay_ms: retry delay in milliseconds - :param name: unique name for atom - """ - super(CheckOrRestartRabbitMq, self).__init__(name=name, **kwargs) - - self.os_client = os_client - self.sleep_time = 0 - if retry_delay_seconds: - self.sleep_time = retry_delay_seconds - - if retry_delay_ms: - self.sleep_time += retry_delay_ms / 1000.0 - - def execute(self, action, vm_info, vm_ip, port, **kwargs): - """main execute method - - :param action: The request context in dict format. - :type action: oslo_context.RequestContext - :param vm_info: Unique ID for the node. - :type vm_info: dict or string - :param vm_ip: - :type vm_ip: - :param port: - :type port: - """ - if six.PY2 and isinstance(port, unicode): - check_port = port.encode() - else: - check_port = port - - if isinstance(vm_info, dict): - vm_id = vm_info['id'] - else: - vm_id = vm_info - - if action == 'restart': - self.os_client.servers.reboot(vm_id) - - tn = telnet.Telnet() - tn.open(vm_ip, check_port, timeout=10) - - def revert(self, *args, **kwargs): - """Revert CreateVmTask - - This method is executed upon failure of the GetRabbitVmStatus or the - Flow that the Task is part of. - - :param args: positional arguments that the task required to execute. - :type args: list - :param kwargs: keyword arguments that the task required to execute; the - special key `result` will contain the :meth:`execute` - results (if any) and the special key `flow_failures` - will contain any failure information. - """ - if self.sleep_time != 0: - time.sleep(self.sleep_time) diff --git a/cue/taskflow/task/get_rabbit_cluster_status.py b/cue/taskflow/task/get_rabbit_cluster_status.py new file mode 100644 index 00000000..19042918 --- /dev/null +++ b/cue/taskflow/task/get_rabbit_cluster_status.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import urllib +import urllib2 + +import taskflow.task + + +class GetRabbitClusterStatus(taskflow.task.Task): + def execute(self, + vm_ip, + default_rabbit_user, + default_rabbit_pass, + proto=None, + **kwargs): + """Main execute method to verify Rabbitmq clustering in a VM + + :param vm_ip: VM ip address + :type vm_ip: string + :param default_rabbit_user: Username for RabbitMQ + :type default_rabbit_user: string + :param default_rabbit_pass: Password for RabbitMQ + :type default_rabbit_pass: string + :param proto: Protocol for url, http/https + :type proto: string + """ + if not proto: + proto = 'http' + + base_url = proto + '://' + vm_ip + ':15672/api' + + # RMQ management url to get a list of the existing vhosts + vhosts_url = base_url + '/vhosts' + # RMQ management url to query whether the RMQ nodes have clustered + aliveness_url = base_url + '/aliveness-test' + + password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + password_mgr.add_password(None, + vhosts_url, + default_rabbit_user, + default_rabbit_pass) + handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(handler) + + retval = "NOT-OK" + + try: + res = opener.open(vhosts_url) + except urllib2.URLError: + pass + else: + json_res = json.load(res) + + result = {} + for x in json_res: + vhost = urllib.quote(x['name'], '') + cur_aliveness_url = aliveness_url + '/' + vhost + + password_mgr.add_password(None, + cur_aliveness_url, + default_rabbit_user, + default_rabbit_pass) + handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(handler) + res = opener.open(cur_aliveness_url) + + result[x['name']] = json.load(res)['status'] + if all(val == 'ok' for val in result.values()): + retval = 'OK' + + return retval \ No newline at end of file diff --git a/cue/templates/install_rabbit.sh.tmpl b/cue/templates/install_rabbit.sh.tmpl index be6a67cf..59bd66fa 100644 --- a/cue/templates/install_rabbit.sh.tmpl +++ b/cue/templates/install_rabbit.sh.tmpl @@ -51,6 +51,8 @@ rabbitmq hard nofile 65536 rabbitmq soft nofile 65536 EOF +rabbitmq-plugins enable rabbitmq_management + update-rc.d rabbitmq-server enable while [ ! -z "$(service rabbitmq-server status | grep 'nodedown')" ]; do diff --git a/cue/tests/functional/fixtures/urllib2_fixture.py b/cue/tests/functional/fixtures/urllib2_fixture.py new file mode 100644 index 00000000..6fd3a118 --- /dev/null +++ b/cue/tests/functional/fixtures/urllib2_fixture.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import StringIO +import urllib2 + +import cue.tests.functional.fixtures.base as base + + +class Urllib2ResultDetails(object): + urllib2_result_list = [] + + @staticmethod + def set_urllib2_result(results): + """Helper function to setup sequence of fixed results. + + :param results: list of results + """ + for result in results: + io = StringIO.StringIO(result) + Urllib2ResultDetails.urllib2_result_list.append(io) + + @staticmethod + def get_urllib2_result(): + """Returns the next result in configured sequence. + + If result sequence is empty, an empty string result of '' is returned. + + :return: Current urllib result in urllib2_result_list. + """ + if len(Urllib2ResultDetails.urllib2_result_list) == 0: + result = '' + else: + result = Urllib2ResultDetails.urllib2_result_list.pop() + return result + + +class Urllib2Fixture(base.BaseFixture): + """A test fixture to simulate urllib2 calls + + This class is used in test cases to simulate urllib2 calls. + """ + def __init__(self, *args, **kwargs): + super(Urllib2Fixture, self).__init__(*args, **kwargs) + + def setUp(self): + """Set up test fixture and apply all method overrides.""" + super(Urllib2Fixture, self).setUp() + + urllib2_client = self.mock('urllib2.OpenerDirector') + urllib2_client.open = self.open + + def open(self, url): + result = Urllib2ResultDetails.get_urllib2_result() + if result.getvalue() is 'URLError': + raise urllib2.URLError('urlerror') + return result \ No newline at end of file diff --git a/cue/tests/functional/taskflow/flow/test_create_cluster.py b/cue/tests/functional/taskflow/flow/test_create_cluster.py index 0e286721..dd6cff1d 100644 --- a/cue/tests/functional/taskflow/flow/test_create_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_create_cluster.py @@ -22,7 +22,7 @@ from cue.taskflow.flow import create_cluster from cue.tests.functional import base from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet +from cue.tests.functional.fixtures import urllib2_fixture from taskflow import engines import taskflow.exceptions as taskflow_exc @@ -32,7 +32,7 @@ class CreateClusterTests(base.FunctionalTestCase): additional_fixtures = [ nova.NovaClient, neutron.NeutronClient, - telnet.TelnetClient + urllib2_fixture.Urllib2Fixture ] def setUp(self): @@ -66,6 +66,19 @@ class CreateClusterTests(base.FunctionalTestCase): name=management_network_name) self.management_network = network_list['networks'][0] + # Todo(Dan) If testing becomes asynchronous, then there is no guarantee + # that these urllib return results will come in the proper order. Will + # have to update the urllib2 fixture to respond appropriately for the + # url passed in. + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]'] + ) + def test_create_cluster(self): flow_store = { "image": self.valid_image.id, diff --git a/cue/tests/functional/taskflow/flow/test_delete_cluster.py b/cue/tests/functional/taskflow/flow/test_delete_cluster.py index f79ebf4d..ed0f7288 100644 --- a/cue/tests/functional/taskflow/flow/test_delete_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_delete_cluster.py @@ -24,7 +24,7 @@ from cue.taskflow.flow import delete_cluster from cue.tests.functional import base from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet +from cue.tests.functional.fixtures import urllib2_fixture from taskflow import engines import taskflow.exceptions as taskflow_exc @@ -34,7 +34,7 @@ class DeleteClusterTests(base.FunctionalTestCase): additional_fixtures = [ nova.NovaClient, neutron.NeutronClient, - telnet.TelnetClient + urllib2_fixture.Urllib2Fixture ] def setUp(self): @@ -68,6 +68,19 @@ class DeleteClusterTests(base.FunctionalTestCase): name=management_network_name) self.management_network = network_list['networks'][0] + # Todo(Dan) If testing becomes asynchronous, then there is no guarantee + # that these urllib return results will come in the proper order. Will + # have to update the urllib2 fixture to respond appropriately for the + # url passed in. + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]', + '{"status": "ok"}', + '[{"name": "/"}]'] + ) + def test_delete_cluster(self): flow_store_create = { "image": self.valid_image.id, diff --git a/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py b/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py deleted file mode 100644 index 50610b7b..00000000 --- a/cue/tests/functional/taskflow/task/test_check_or_restart_rabbitmq.py +++ /dev/null @@ -1,110 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import cue.client as client -import cue.taskflow.task as cue_task -from cue.tests.functional import base -from cue.tests.functional.fixtures import nova -from cue.tests.functional.fixtures import telnet - -from taskflow import engines -from taskflow.patterns import linear_flow -import taskflow.retry as retry - -import uuid - - -class CheckOrRestartRabbitTest(base.FunctionalTestCase): - additional_fixtures = [ - nova.NovaClient, - telnet.TelnetClient - ] - - task_store = { - 'vm_ip': "0.0.0.0", - 'vm_info': str(uuid.uuid4()), - 'port': '5672' - } - - def setUp(self): - super(CheckOrRestartRabbitTest, self).setUp() - - self.nova_client = client.nova_client() - - rabbitmq_retry_strategy = [ - 'check', - 'check', - 'check', - 'check', - 'restart', - ] - - retry_controller = retry.ForEach(rabbitmq_retry_strategy, - "retry check RabbitMQ", - provides="retry_strategy") - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry_controller).add( - cue_task.CheckOrRestartRabbitMq( - os_client=self.nova_client, - name="get RabbitMQ status %s", - rebind={'action': "retry_strategy"}, - retry_delay_seconds=1)) - - def test_get_rabbit_status(self): - """Verifies GetRabbitVmStatus task directly.""" - - # start engine to run task - engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store) - - def test_get_vm_status_flow(self): - """Verifies GetRabbitVmStatus in a successful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until telnet_status acquired from get_rabbit_vm_status - task returns 'connect'. Attempting the telnet connection will return - 'wait' for the first three times, then will return 'connected' once a - telnet connection is made. The node status should be in Active state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['connected', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - # start engine to run task - engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store) - self.assertFalse(self.nova_client.servers.reboot.called) - - def test_get_vm_status_flow_fail(self): - """Verifies GetRabbitVmStatus in an unsuccessful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until the retry count has been exhausted. Attempting - the telnet connection will return 'wait' until retry count reaches - limit and flow fails. The node status should remain in Building state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['wait', - 'wait', - 'wait', - 'wait', - 'wait', - 'wait']) - - # start engine to run task - self.assertRaises(IOError, engines.run, self.flow, - store=CheckOrRestartRabbitTest.task_store) - self.assertTrue(self.nova_client.servers.reboot.called) diff --git a/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py b/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py new file mode 100644 index 00000000..e3a46a53 --- /dev/null +++ b/cue/tests/functional/taskflow/task/test_get_rabbit_cluster_status.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cue.taskflow.task import get_rabbit_cluster_status +from cue.tests.functional import base +from cue.tests.functional.fixtures import urllib2_fixture as urllib2_fixture + +from taskflow import engines +from taskflow.patterns import linear_flow + +import urllib2 + + +class GetRabbitClusterStatusTest(base.FunctionalTestCase): + additional_fixtures = [ + urllib2_fixture.Urllib2Fixture + ] + + task_store = { + "vm_ip": "0.0.0.0", + "default_rabbit_user": "user", + "default_rabbit_pass": "pass" + } + + def setUp(self): + super(GetRabbitClusterStatusTest, self).setUp() + + def test_get_rabbit_cluster_status(self): + """Verifies GetRabbitClusterStatus task.""" + + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{"status": "ok"}', + '{"status": "clustering"}', + '[{"name": "/"}]'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + + def test_get_rabbit_cluster_status_fail(self): + """Verifies unsuccessful path. + + This test simulates when the RMQ vm is not responding to the + urllib2.open calls on the RMQ management port. + """ + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['{}', + '[]'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + + def test_get_rabbit_cluster_status_connection_failed(self): + """Verifies GetRabbitClusterStatus task.""" + + urllib2_fixture.Urllib2ResultDetails.set_urllib2_result( + ['URLError'] + ) + + self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add( + get_rabbit_cluster_status.GetRabbitClusterStatus( + name="get RabbitMQ status")) + + try: + # start engine + engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store) + except urllib2.URLError: + # Expected + pass \ No newline at end of file diff --git a/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py b/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py deleted file mode 100644 index d5357ab9..00000000 --- a/cue/tests/functional/taskflow/task/test_get_rabbit_vm_status.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from cue.tests.functional import base -from cue.tests.functional.fixtures import telnet -import os_tasklib.common.verify_network_task as verify_network_task - -from taskflow import engines -from taskflow.patterns import linear_flow -import taskflow.retry as retry - - -class GetRabbitVmStatusTest(base.FunctionalTestCase): - additional_fixtures = [ - telnet.TelnetClient - ] - - task_store = { - 'vm_ip': "0.0.0.0", - 'port': '5672' - } - - def setUp(self): - super(GetRabbitVmStatusTest, self).setUp() - - def test_get_rabbit_status(self): - """Verifies GetRabbitVmStatus task directly.""" - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(5)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - engines.run(self.flow, store=GetRabbitVmStatusTest.task_store) - - def test_get_vm_status_flow(self): - """Verifies GetRabbitVmStatus in a successful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until telnet_status acquired from get_rabbit_vm_status - task returns 'connect'. Attempting the telnet connection will return - 'wait' for the first three times, then will return 'connected' once a - telnet connection is made. The node status should be in Active state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['connected', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(5)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - engines.run(self.flow, store=GetRabbitVmStatusTest.task_store) - - def test_get_vm_status_flow_fail(self): - """Verifies GetRabbitVmStatus in an unsuccessful retry flow. - - This test simulates creating a cluster, then directly running a flow - which will fail until the retry count has been exhausted. Attempting - the telnet connection will return 'wait' until retry count reaches - limit and flow fails. The node status should remain in Building state. - """ - # configure custom vm_status list - telnet.TelnetStatusDetails.set_telnet_status(['wait', - 'wait', - 'wait', - 'wait']) - - # create flow with "GetRabbitVmStatus" task - self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state", - retry=retry.Times(3)).add( - verify_network_task.VerifyNetwork( - name="get RabbitMQ status", - retry_delay_seconds=1)) - - # start engine to run task - self.assertRaises(IOError, engines.run, self.flow, - store=GetRabbitVmStatusTest.task_store) \ No newline at end of file diff --git a/os_tasklib/common/__init__.py b/os_tasklib/common/__init__.py index 6bf4d11b..da298c66 100644 --- a/os_tasklib/common/__init__.py +++ b/os_tasklib/common/__init__.py @@ -18,4 +18,3 @@ from check_for import CheckFor # noqa from lambda_task import Lambda # noqa from map_task import Map # noqa from reduce_task import Reduce # noqa -from verify_network_task import VerifyNetwork # noqa diff --git a/os_tasklib/common/verify_network_task.py b/os_tasklib/common/verify_network_task.py deleted file mode 100644 index 394861fa..00000000 --- a/os_tasklib/common/verify_network_task.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import telnetlib as telnet -import time - -import six -import taskflow.task - - -class VerifyNetwork(taskflow.task.Task): - def __init__(self, - retry_delay_seconds=None, - retry_delay_ms=None, - name=None, - **kwargs): - """Constructor - - This constructor sets the retry delay for this task's revert method. - - :param retry_delay_seconds: retry delay in seconds - :param retry_delay_ms: retry delay in milliseconds - :param name: unique name for atom - """ - super(VerifyNetwork, self).__init__(name=name, **kwargs) - - self.sleep_time = 0 - if retry_delay_seconds: - self.sleep_time = retry_delay_seconds - - if retry_delay_ms: - self.sleep_time += retry_delay_ms / 1000.0 - - def execute(self, vm_ip, port, **kwargs): - """Main execute method to verify network connection in a VM - - :param vm_ip: VM ip address - :type vm_ip: string - :param port: host service port - :type port: int - """ - if six.PY2 and isinstance(port, unicode): - check_port = port.encode() - else: - check_port = port - tn = telnet.Telnet() - tn.open(vm_ip, check_port, timeout=10) - - def revert(self, *args, **kwargs): - """Revert CreateVmTask - - This method is executed upon failure of the GetRabbitVmStatus or the - Flow that the Task is part of. - - :param args: positional arguments that the task required to execute. - :type args: list - :param kwargs: keyword arguments that the task required to execute; the - special key `result` will contain the :meth:`execute` - results (if any) and the special key `flow_failures` - will contain any failure information. - """ - if self.sleep_time != 0: - time.sleep(self.sleep_time)