From 6d1bc2ab063c2945286d6129f774eae92e81192a Mon Sep 17 00:00:00 2001 From: pongubal Date: Wed, 9 Dec 2015 12:10:57 -0800 Subject: [PATCH] Abort VM Status flow when VM(s) go to ERROR state While waiting for VMs to go ACTIVE during the create cluster flow, if one or more VMs go into ERROR state the VM check task continues until it times out. If one or more VM(s) go into ERROR state, the flow should be abort instead of waiting for that VM to go into ACTIVE state. Change-Id: I9e3768c5a55803afe05f75fad41ecf74471b4694 Closes-Bug: 1522633 --- cue/common/exception.py | 8 + cue/taskflow/flow/create_cluster_node.py | 15 +- cue/taskflow/retry/__init__.py | 16 ++ cue/taskflow/retry/exception_times.py | 54 ++++++ cue/taskflow/task/__init__.py | 5 +- cue/taskflow/task/check_for_vm_status.py | 57 ++++++ cue/tests/functional/fixtures/nova.py | 4 +- .../taskflow/flow/test_create_cluster.py | 179 +++++++++++++++++- .../taskflow/task/test_get_vm_status.py | 16 +- .../unit/taskflow/test_check_for_vm_status.py | 51 +++++ cue/tests/unit/taskflow/test_errored_times.py | 44 +++++ os_tasklib/common/__init__.py | 10 +- 12 files changed, 435 insertions(+), 24 deletions(-) create mode 100644 cue/taskflow/retry/__init__.py create mode 100644 cue/taskflow/retry/exception_times.py create mode 100644 cue/taskflow/task/check_for_vm_status.py create mode 100644 cue/tests/unit/taskflow/test_check_for_vm_status.py create mode 100644 cue/tests/unit/taskflow/test_errored_times.py diff --git a/cue/common/exception.py b/cue/common/exception.py index 22e9f779..569402be 100644 --- a/cue/common/exception.py +++ b/cue/common/exception.py @@ -141,3 +141,11 @@ class NodeAlreadyExists(Conflict): class ConfigurationError(CueException): message = _("Configuration Error") + + +class VmBuildingException(CueException): + message = _("VM is in building state") + + +class VmErrorException(CueException): + message = _("VM is not in a building state") diff --git a/cue/taskflow/flow/create_cluster_node.py b/cue/taskflow/flow/create_cluster_node.py index 1623ed47..6c9aa74c 100644 --- a/cue/taskflow/flow/create_cluster_node.py +++ b/cue/taskflow/flow/create_cluster_node.py @@ -15,10 +15,11 @@ from oslo_config import cfg import taskflow.patterns.linear_flow as linear_flow -import taskflow.retry as retry import cue.client as client +from cue.common import exception as cue_exceptions from cue.db.sqlalchemy import models +import cue.taskflow.retry.exception_times as retry import cue.taskflow.task as cue_tasks import os_tasklib.common as os_common import os_tasklib.neutron as neutron @@ -132,26 +133,32 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, name="extract vm id %s" % node_name, rebind={'vm_info': "vm_info_%d" % node_number}, provides="vm_id_%d" % node_number) + graph_flow.add(get_vm_id) graph_flow.link(create_vm, get_vm_id) retry_count = CONF.flow_options.create_cluster_node_vm_active_retry_count check_vm_active = linear_flow.Flow( name="wait for VM active state %s" % node_name, - retry=retry.Times(retry_count, revert_all=True)) + retry=retry.ExceptionTimes( + revert_exception_list=[cue_exceptions.VmErrorException], + attempts=retry_count, + revert_all=True) + ) + check_vm_active.add( nova.GetVmStatus( os_client=client.nova_client(), name="get vm %s" % node_name, rebind={'nova_vm_id': "vm_id_%d" % node_number}, provides="vm_status_%d" % node_number), - os_common.CheckFor( + cue_tasks.CheckForVmStatus( name="check vm status %s" % node_name, details="waiting for ACTIVE VM status", rebind={'check_var': "vm_status_%d" % node_number}, - check_value='ACTIVE', retry_delay_seconds=10), ) + graph_flow.add(check_vm_active) graph_flow.link(get_vm_id, check_vm_active) diff --git a/cue/taskflow/retry/__init__.py b/cue/taskflow/retry/__init__.py new file mode 100644 index 00000000..3f187a73 --- /dev/null +++ b/cue/taskflow/retry/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cue.taskflow.retry.exception_times import ExceptionTimes # noqa diff --git a/cue/taskflow/retry/exception_times.py b/cue/taskflow/retry/exception_times.py new file mode 100644 index 00000000..1e3e8c73 --- /dev/null +++ b/cue/taskflow/retry/exception_times.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +import taskflow.retry as retry + +LOG = logging.getLogger(__name__) + + +class ExceptionTimes(retry.Times): + """Retries subflow given number of times. Returns attempt number. + + :param attempts: number of attempts to retry the associated subflow + before giving up + :type attempts: int + :param revert_all: when provided this will cause the full flow to revert + when the number of attempts that have been tried + has been reached (when false, it will only locally + revert the associated subflow) + :type revert_all: bool + Further arguments are interpreted as defined in the + :py:class:`~taskflow.atom.Atom` constructor. + """ + + def __init__(self, revert_exception_list=None, **kwargs): + super(ExceptionTimes, self).__init__(**kwargs) + if revert_exception_list: + self._revert_exception_list = revert_exception_list + else: + self._revert_exception_list = [] + + def on_failure(self, history, *args, **kwargs): + + (owner, outcome) = history.outcomes_iter(len(history) - 1).next() + + if type(outcome.exception) in self._revert_exception_list: + return self._revert_action + + return super(ExceptionTimes, self).on_failure(history, args, kwargs) + + def execute(self, history, *args, **kwargs): + return len(history) + 1 diff --git a/cue/taskflow/task/__init__.py b/cue/taskflow/task/__init__.py index 83157bce..8f732b54 100644 --- a/cue/taskflow/task/__init__.py +++ b/cue/taskflow/task/__init__.py @@ -13,10 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +from cue.taskflow.task.check_for_vm_status import CheckForVmStatus # noqa from cue.taskflow.task.cluster_node_userdata import ClusterNodeUserData # noqa from cue.taskflow.task.create_endpoint_task import CreateEndpoint # noqa from cue.taskflow.task.get_node import GetNode # noqa from cue.taskflow.task.get_rabbit_cluster_status import GetRabbitClusterStatus # noqa from cue.taskflow.task.update_cluster_record_task import UpdateClusterRecord # noqa -from cue.taskflow.task.update_endpoints_record_task import UpdateEndpointsRecord # noqa -from cue.taskflow.task.update_node_record_task import UpdateNodeRecord # noqa +from cue.taskflow.task.update_endpoints_record_task import UpdateEndpointsRecord # noqa +from cue.taskflow.task.update_node_record_task import UpdateNodeRecord # noqa diff --git a/cue/taskflow/task/check_for_vm_status.py b/cue/taskflow/task/check_for_vm_status.py new file mode 100644 index 00000000..a5bbc05f --- /dev/null +++ b/cue/taskflow/task/check_for_vm_status.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import taskflow.task as task + +from cue.common import exception as cue_exceptions + + +class CheckForVmStatus(task.Task): + def __init__(self, + check_value_building='BUILD', + check_value_active='ACTIVE', + retry_delay_seconds=None, + retry_delay_ms=None, + name=None, + details=None, + **kwargs): + super(CheckForVmStatus, self).__init__(name=name, **kwargs) + + self.check_value_building = check_value_building + self.check_value_active = check_value_active + self.sleep_time = 0 + self.details = details + if retry_delay_seconds: + self.sleep_time = retry_delay_seconds + + if retry_delay_ms: + self.sleep_time += retry_delay_ms / 1000.0 + + def execute(self, check_var, **kwargs): + error_string = "expected %s, got %s" % (self.check_value_active, + check_var) + if self.details is not None: + error_string += ", message: %s" % self.details + + if check_var == self.check_value_building: + raise cue_exceptions.VmBuildingException(error_string) + elif check_var != self.check_value_active: + raise cue_exceptions.VmErrorException(error_string) + + def revert(self, check_var, *args, **kwargs): + if self.sleep_time != 0: + time.sleep(self.sleep_time) \ No newline at end of file diff --git a/cue/tests/functional/fixtures/nova.py b/cue/tests/functional/fixtures/nova.py index ac5480d1..edb0a914 100644 --- a/cue/tests/functional/fixtures/nova.py +++ b/cue/tests/functional/fixtures/nova.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import uuid import novaclient.exceptions as nova_exc @@ -87,8 +88,7 @@ class VmStatusDetails(object): :param statuses: list of statuses """ - for status in statuses: - VmStatusDetails.vm_status_list.append(status) + VmStatusDetails.vm_status_list = copy.deepcopy(statuses) @staticmethod def get_status(): diff --git a/cue/tests/functional/taskflow/flow/test_create_cluster.py b/cue/tests/functional/taskflow/flow/test_create_cluster.py index 40904375..a5a2a363 100644 --- a/cue/tests/functional/taskflow/flow/test_create_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_create_cluster.py @@ -16,8 +16,12 @@ import uuid from neutronclient.common import exceptions as neutron_exceptions +from oslo_config import cfg +from taskflow import engines +import taskflow.exceptions as taskflow_exc from cue import client +from cue.common import exception as cue_exceptions from cue.db.sqlalchemy import models from cue import objects from cue.taskflow.flow import create_cluster @@ -26,8 +30,8 @@ from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova from cue.tests.functional.fixtures import urllib2_fixture -from taskflow import engines -import taskflow.exceptions as taskflow_exc + +CONF = cfg.CONF class CreateClusterTests(base.FunctionalTestCase): @@ -151,6 +155,175 @@ class CreateClusterTests(base.FunctionalTestCase): self.assertEqual(expected_management_ip, actual_management_ip, "invalid management ip") + def test_create_cluster_nova_error(self): + flow_store = { + "tenant_id": str(self.valid_network['tenant_id']), + "image": self.valid_image.id, + "flavor": self.valid_flavor.id, + "port": self.port, + "context": self.context.to_dict(), + "erlang_cookie": str(uuid.uuid4()), + "default_rabbit_user": 'rabbit', + "default_rabbit_pass": str(uuid.uuid4()), + } + + cluster_values = { + "project_id": self.context.tenant_id, + "name": "RabbitCluster", + "network_id": self.valid_network['id'], + "flavor": "1", + "size": 3, + } + + new_cluster = objects.Cluster(**cluster_values) + new_cluster.create(self.context) + + nodes = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + CONF.flow_options.create_cluster_node_vm_active_retry_count = 3 + + # configure custom vm_status list + nova.VmStatusDetails.set_vm_status(['ACTIVE', + 'ERROR', + 'BUILD', + 'BUILD']) + + node_ids = [] + for node in nodes: + node_ids.append(node.id) + + flow = create_cluster(new_cluster.id, + node_ids, + self.valid_network['id'], + self.management_network['id']) + + try: + engines.run(flow, store=flow_store) + except taskflow_exc.WrappedFailure as err: + self.assertEqual(3, len(err._causes)) + exc_list = [type(c.exception) for c in err._causes] + self.assertEqual(sorted([cue_exceptions.VmErrorException, + cue_exceptions.VmBuildingException, + cue_exceptions.VmBuildingException]), + sorted(exc_list)) + except Exception as e: + self.assertEqual(taskflow_exc.WrappedFailure, type(e)) + else: + self.fail("Expected taskflow_exc.WrappedFailure exception.") + + def test_create_cluster_max_retries_multi_node_single_retry(self): + flow_store = { + "tenant_id": str(self.valid_network['tenant_id']), + "image": self.valid_image.id, + "flavor": self.valid_flavor.id, + "port": self.port, + "context": self.context.to_dict(), + "erlang_cookie": str(uuid.uuid4()), + "default_rabbit_user": 'rabbit', + "default_rabbit_pass": str(uuid.uuid4()), + } + + cluster_values = { + "project_id": self.context.tenant_id, + "name": "RabbitCluster", + "network_id": self.valid_network['id'], + "flavor": "1", + "size": 3, + } + + new_cluster = objects.Cluster(**cluster_values) + new_cluster.create(self.context) + + nodes = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + # Todo: Raise the retry count once the fixture timeout issue + # is resolved + CONF.flow_options.create_cluster_node_vm_active_retry_count = 1 + + # configure custom vm_status list + nova.VmStatusDetails.set_vm_status(['BUILD', + 'BUILD', + 'BUILD', + 'BUILD']) + + node_ids = [] + for node in nodes: + node_ids.append(node.id) + + flow = create_cluster(new_cluster.id, + node_ids, + self.valid_network['id'], + self.management_network['id']) + + try: + engines.run(flow, store=flow_store) + except taskflow_exc.WrappedFailure as err: + self.assertEqual(3, len(err._causes)) + exc_list = [type(c.exception) for c in err._causes] + self.assertEqual([cue_exceptions.VmBuildingException, + cue_exceptions.VmBuildingException, + cue_exceptions.VmBuildingException], + exc_list) + except Exception as e: + self.assertEqual(taskflow_exc.WrappedFailure, type(e)) + else: + self.fail("Expected taskflow_exc.WrappedFailure exception.") + + def test_create_cluster_max_retries_single_node(self): + flow_store = { + "tenant_id": str(self.valid_network['tenant_id']), + "image": self.valid_image.id, + "flavor": self.valid_flavor.id, + "port": self.port, + "context": self.context.to_dict(), + "erlang_cookie": str(uuid.uuid4()), + "default_rabbit_user": 'rabbit', + "default_rabbit_pass": str(uuid.uuid4()), + } + + cluster_values = { + "project_id": self.context.tenant_id, + "name": "RabbitCluster", + "network_id": self.valid_network['id'], + "flavor": "1", + "size": 1, + } + + new_cluster = objects.Cluster(**cluster_values) + new_cluster.create(self.context) + + nodes = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + CONF.flow_options.create_cluster_node_vm_active_retry_count = 3 + + # configure custom vm_status list + nova.VmStatusDetails.set_vm_status(['BUILD', + 'BUILD', + 'BUILD', + 'BUILD']) + + node_ids = [] + for node in nodes: + node_ids.append(node.id) + + flow = create_cluster(new_cluster.id, + node_ids, + self.valid_network['id'], + self.management_network['id']) + + self.assertRaises(cue_exceptions.VmBuildingException, + engines.run, flow, store=flow_store) + + def test_create_cluster_max_retries_multi_node_multi_retry(self): + #Todo - This test is stubbed due to issues with the fixture timeout + # configuration, which interrupts the this test. The timeout needs + # to be solved and this test needs to be implemented to give full + # confidence in the retry feature. + pass + def test_create_cluster_overlimit(self): vm_list = self.nova_client.servers.list() port_list = self.neutron_client.list_ports() @@ -296,4 +469,4 @@ class CreateClusterTests(base.FunctionalTestCase): def tearDown(self): for vm_id in self.new_vm_list: self.nova_client.servers.delete(vm_id) - super(CreateClusterTests, self).tearDown() + super(CreateClusterTests, self).tearDown() \ No newline at end of file diff --git a/cue/tests/functional/taskflow/task/test_get_vm_status.py b/cue/tests/functional/taskflow/task/test_get_vm_status.py index 1fdd66a6..266decf6 100644 --- a/cue/tests/functional/taskflow/task/test_get_vm_status.py +++ b/cue/tests/functional/taskflow/task/test_get_vm_status.py @@ -84,9 +84,9 @@ class GetVmStatusTests(base.FunctionalTestCase): """ # configure custom vm_status list nova.VmStatusDetails.set_vm_status(['ACTIVE', - 'BUILDING', - 'BUILDING', - 'BUILDING']) + 'BUILD', + 'BUILD', + 'BUILD']) # create flow with "GetVmStatus" task self.flow = linear_flow.Flow('wait for vm to become active', @@ -125,11 +125,11 @@ class GetVmStatusTests(base.FunctionalTestCase): failure. """ # configure custom vm_status list - nova.VmStatusDetails.set_vm_status(['BUILDING', - 'BUILDING', - 'BUILDING', - 'BUILDING', - 'BUILDING', + nova.VmStatusDetails.set_vm_status(['BUILD', + 'BUILD', + 'BUILD', + 'BUILD', + 'BUILD', 'ERROR', 'ERROR']) diff --git a/cue/tests/unit/taskflow/test_check_for_vm_status.py b/cue/tests/unit/taskflow/test_check_for_vm_status.py new file mode 100644 index 00000000..f40adc09 --- /dev/null +++ b/cue/tests/unit/taskflow/test_check_for_vm_status.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from cue.common import exception as cue_exceptions +import cue.taskflow.task.check_for_vm_status as check_for_vm_status +from cue.tests.unit import base + + +class TaskflowCheckVmStatusTest(base.UnitTestCase): + def setUp(self): + super(TaskflowCheckVmStatusTest, self).setUp() + + def tearDown(self): + super(TaskflowCheckVmStatusTest, self).tearDown() + + @mock.patch('time.sleep') + def test_check_vm_status_ms(self, mock_sleep): + check_task = check_for_vm_status.CheckForVmStatus( + name="check vm status", + details="waiting for ACTIVE VM status", + retry_delay_ms=1000, + ) + check_task.revert('BUILD') + + self.assertEqual(1.0, mock_sleep.call_args[0][0]) + + def test_check_vm_status_no_details(self): + check_task = check_for_vm_status.CheckForVmStatus( + name="check vm status", + ) + try: + check_task.execute('BUILD') + except cue_exceptions.VmBuildingException as err: + self.assertEqual(cue_exceptions.VmBuildingException.message, + err.message) + else: + self.fail("Expected VmBuildingException") diff --git a/cue/tests/unit/taskflow/test_errored_times.py b/cue/tests/unit/taskflow/test_errored_times.py new file mode 100644 index 00000000..e40737a5 --- /dev/null +++ b/cue/tests/unit/taskflow/test_errored_times.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import taskflow.retry as tf_retry + +from cue.common import exception as cue_exceptions +import cue.taskflow.retry.exception_times as retry +from cue.tests.unit import base + + +class TaskflowExceptionTimesTest(base.UnitTestCase): + def setUp(self): + super(TaskflowExceptionTimesTest, self).setUp() + + def tearDown(self): + super(TaskflowExceptionTimesTest, self).tearDown() + + def test_revert_action(self): + + retry_exception_times = retry.ExceptionTimes( + revert_exception_list=[cue_exceptions.VmErrorException], + attempts=10, + revert_all=False) + self.assertEqual(tf_retry.REVERT, retry_exception_times._revert_action) + + def test_revert_action_empty_exception_list(self): + + retry_exception_times = retry.ExceptionTimes( + revert_exception_list=None, + attempts=10, + revert_all=False) + self.assertEqual([], retry_exception_times._revert_exception_list) \ No newline at end of file diff --git a/os_tasklib/common/__init__.py b/os_tasklib/common/__init__.py index 675612fb..ca7ced8f 100644 --- a/os_tasklib/common/__init__.py +++ b/os_tasklib/common/__init__.py @@ -13,8 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. -from os_tasklib.common.assert_task import Assert # noqa -from os_tasklib.common.check_for import CheckFor # noqa -from os_tasklib.common.lambda_task import Lambda # noqa -from os_tasklib.common.map_task import Map # noqa -from os_tasklib.common.reduce_task import Reduce # noqa +from os_tasklib.common.assert_task import Assert # noqa +from os_tasklib.common.check_for import CheckFor # noqa +from os_tasklib.common.lambda_task import Lambda # noqa +from os_tasklib.common.map_task import Map # noqa +from os_tasklib.common.reduce_task import Reduce # noqa