Abort VM Status flow when VM(s) go to ERROR state

While waiting for VMs to go ACTIVE during the create cluster flow,
if one or more VMs go into ERROR state the VM check task continues
until it times out. If one or more VM(s) go into ERROR state, the flow
should be abort instead of waiting for that VM to go into ACTIVE state.

Change-Id: I9e3768c5a55803afe05f75fad41ecf74471b4694
Closes-Bug: 1522633
This commit is contained in:
pongubal 2015-12-09 12:10:57 -08:00 committed by Daniel Allegood
parent 4dcc53b385
commit 6d1bc2ab06
12 changed files with 435 additions and 24 deletions

View File

@ -141,3 +141,11 @@ class NodeAlreadyExists(Conflict):
class ConfigurationError(CueException):
message = _("Configuration Error")
class VmBuildingException(CueException):
message = _("VM is in building state")
class VmErrorException(CueException):
message = _("VM is not in a building state")

View File

@ -15,10 +15,11 @@
from oslo_config import cfg
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
import cue.client as client
from cue.common import exception as cue_exceptions
from cue.db.sqlalchemy import models
import cue.taskflow.retry.exception_times as retry
import cue.taskflow.task as cue_tasks
import os_tasklib.common as os_common
import os_tasklib.neutron as neutron
@ -132,26 +133,32 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
name="extract vm id %s" % node_name,
rebind={'vm_info': "vm_info_%d" % node_number},
provides="vm_id_%d" % node_number)
graph_flow.add(get_vm_id)
graph_flow.link(create_vm, get_vm_id)
retry_count = CONF.flow_options.create_cluster_node_vm_active_retry_count
check_vm_active = linear_flow.Flow(
name="wait for VM active state %s" % node_name,
retry=retry.Times(retry_count, revert_all=True))
retry=retry.ExceptionTimes(
revert_exception_list=[cue_exceptions.VmErrorException],
attempts=retry_count,
revert_all=True)
)
check_vm_active.add(
nova.GetVmStatus(
os_client=client.nova_client(),
name="get vm %s" % node_name,
rebind={'nova_vm_id': "vm_id_%d" % node_number},
provides="vm_status_%d" % node_number),
os_common.CheckFor(
cue_tasks.CheckForVmStatus(
name="check vm status %s" % node_name,
details="waiting for ACTIVE VM status",
rebind={'check_var': "vm_status_%d" % node_number},
check_value='ACTIVE',
retry_delay_seconds=10),
)
graph_flow.add(check_vm_active)
graph_flow.link(get_vm_id, check_vm_active)

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cue.taskflow.retry.exception_times import ExceptionTimes # noqa

View File

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import taskflow.retry as retry
LOG = logging.getLogger(__name__)
class ExceptionTimes(retry.Times):
"""Retries subflow given number of times. Returns attempt number.
:param attempts: number of attempts to retry the associated subflow
before giving up
:type attempts: int
:param revert_all: when provided this will cause the full flow to revert
when the number of attempts that have been tried
has been reached (when false, it will only locally
revert the associated subflow)
:type revert_all: bool
Further arguments are interpreted as defined in the
:py:class:`~taskflow.atom.Atom` constructor.
"""
def __init__(self, revert_exception_list=None, **kwargs):
super(ExceptionTimes, self).__init__(**kwargs)
if revert_exception_list:
self._revert_exception_list = revert_exception_list
else:
self._revert_exception_list = []
def on_failure(self, history, *args, **kwargs):
(owner, outcome) = history.outcomes_iter(len(history) - 1).next()
if type(outcome.exception) in self._revert_exception_list:
return self._revert_action
return super(ExceptionTimes, self).on_failure(history, args, kwargs)
def execute(self, history, *args, **kwargs):
return len(history) + 1

View File

@ -13,10 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from cue.taskflow.task.check_for_vm_status import CheckForVmStatus # noqa
from cue.taskflow.task.cluster_node_userdata import ClusterNodeUserData # noqa
from cue.taskflow.task.create_endpoint_task import CreateEndpoint # noqa
from cue.taskflow.task.get_node import GetNode # noqa
from cue.taskflow.task.get_rabbit_cluster_status import GetRabbitClusterStatus # noqa
from cue.taskflow.task.update_cluster_record_task import UpdateClusterRecord # noqa
from cue.taskflow.task.update_endpoints_record_task import UpdateEndpointsRecord # noqa
from cue.taskflow.task.update_node_record_task import UpdateNodeRecord # noqa
from cue.taskflow.task.update_endpoints_record_task import UpdateEndpointsRecord # noqa
from cue.taskflow.task.update_node_record_task import UpdateNodeRecord # noqa

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import taskflow.task as task
from cue.common import exception as cue_exceptions
class CheckForVmStatus(task.Task):
def __init__(self,
check_value_building='BUILD',
check_value_active='ACTIVE',
retry_delay_seconds=None,
retry_delay_ms=None,
name=None,
details=None,
**kwargs):
super(CheckForVmStatus, self).__init__(name=name, **kwargs)
self.check_value_building = check_value_building
self.check_value_active = check_value_active
self.sleep_time = 0
self.details = details
if retry_delay_seconds:
self.sleep_time = retry_delay_seconds
if retry_delay_ms:
self.sleep_time += retry_delay_ms / 1000.0
def execute(self, check_var, **kwargs):
error_string = "expected %s, got %s" % (self.check_value_active,
check_var)
if self.details is not None:
error_string += ", message: %s" % self.details
if check_var == self.check_value_building:
raise cue_exceptions.VmBuildingException(error_string)
elif check_var != self.check_value_active:
raise cue_exceptions.VmErrorException(error_string)
def revert(self, check_var, *args, **kwargs):
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import uuid
import novaclient.exceptions as nova_exc
@ -87,8 +88,7 @@ class VmStatusDetails(object):
:param statuses: list of statuses
"""
for status in statuses:
VmStatusDetails.vm_status_list.append(status)
VmStatusDetails.vm_status_list = copy.deepcopy(statuses)
@staticmethod
def get_status():

View File

@ -16,8 +16,12 @@
import uuid
from neutronclient.common import exceptions as neutron_exceptions
from oslo_config import cfg
from taskflow import engines
import taskflow.exceptions as taskflow_exc
from cue import client
from cue.common import exception as cue_exceptions
from cue.db.sqlalchemy import models
from cue import objects
from cue.taskflow.flow import create_cluster
@ -26,8 +30,8 @@ from cue.tests.functional.fixtures import neutron
from cue.tests.functional.fixtures import nova
from cue.tests.functional.fixtures import urllib2_fixture
from taskflow import engines
import taskflow.exceptions as taskflow_exc
CONF = cfg.CONF
class CreateClusterTests(base.FunctionalTestCase):
@ -151,6 +155,175 @@ class CreateClusterTests(base.FunctionalTestCase):
self.assertEqual(expected_management_ip, actual_management_ip,
"invalid management ip")
def test_create_cluster_nova_error(self):
flow_store = {
"tenant_id": str(self.valid_network['tenant_id']),
"image": self.valid_image.id,
"flavor": self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": self.valid_network['id'],
"flavor": "1",
"size": 3,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
CONF.flow_options.create_cluster_node_vm_active_retry_count = 3
# configure custom vm_status list
nova.VmStatusDetails.set_vm_status(['ACTIVE',
'ERROR',
'BUILD',
'BUILD'])
node_ids = []
for node in nodes:
node_ids.append(node.id)
flow = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
try:
engines.run(flow, store=flow_store)
except taskflow_exc.WrappedFailure as err:
self.assertEqual(3, len(err._causes))
exc_list = [type(c.exception) for c in err._causes]
self.assertEqual(sorted([cue_exceptions.VmErrorException,
cue_exceptions.VmBuildingException,
cue_exceptions.VmBuildingException]),
sorted(exc_list))
except Exception as e:
self.assertEqual(taskflow_exc.WrappedFailure, type(e))
else:
self.fail("Expected taskflow_exc.WrappedFailure exception.")
def test_create_cluster_max_retries_multi_node_single_retry(self):
flow_store = {
"tenant_id": str(self.valid_network['tenant_id']),
"image": self.valid_image.id,
"flavor": self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": self.valid_network['id'],
"flavor": "1",
"size": 3,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
# Todo: Raise the retry count once the fixture timeout issue
# is resolved
CONF.flow_options.create_cluster_node_vm_active_retry_count = 1
# configure custom vm_status list
nova.VmStatusDetails.set_vm_status(['BUILD',
'BUILD',
'BUILD',
'BUILD'])
node_ids = []
for node in nodes:
node_ids.append(node.id)
flow = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
try:
engines.run(flow, store=flow_store)
except taskflow_exc.WrappedFailure as err:
self.assertEqual(3, len(err._causes))
exc_list = [type(c.exception) for c in err._causes]
self.assertEqual([cue_exceptions.VmBuildingException,
cue_exceptions.VmBuildingException,
cue_exceptions.VmBuildingException],
exc_list)
except Exception as e:
self.assertEqual(taskflow_exc.WrappedFailure, type(e))
else:
self.fail("Expected taskflow_exc.WrappedFailure exception.")
def test_create_cluster_max_retries_single_node(self):
flow_store = {
"tenant_id": str(self.valid_network['tenant_id']),
"image": self.valid_image.id,
"flavor": self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": self.valid_network['id'],
"flavor": "1",
"size": 1,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
CONF.flow_options.create_cluster_node_vm_active_retry_count = 3
# configure custom vm_status list
nova.VmStatusDetails.set_vm_status(['BUILD',
'BUILD',
'BUILD',
'BUILD'])
node_ids = []
for node in nodes:
node_ids.append(node.id)
flow = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
self.assertRaises(cue_exceptions.VmBuildingException,
engines.run, flow, store=flow_store)
def test_create_cluster_max_retries_multi_node_multi_retry(self):
#Todo - This test is stubbed due to issues with the fixture timeout
# configuration, which interrupts the this test. The timeout needs
# to be solved and this test needs to be implemented to give full
# confidence in the retry feature.
pass
def test_create_cluster_overlimit(self):
vm_list = self.nova_client.servers.list()
port_list = self.neutron_client.list_ports()
@ -296,4 +469,4 @@ class CreateClusterTests(base.FunctionalTestCase):
def tearDown(self):
for vm_id in self.new_vm_list:
self.nova_client.servers.delete(vm_id)
super(CreateClusterTests, self).tearDown()
super(CreateClusterTests, self).tearDown()

View File

@ -84,9 +84,9 @@ class GetVmStatusTests(base.FunctionalTestCase):
"""
# configure custom vm_status list
nova.VmStatusDetails.set_vm_status(['ACTIVE',
'BUILDING',
'BUILDING',
'BUILDING'])
'BUILD',
'BUILD',
'BUILD'])
# create flow with "GetVmStatus" task
self.flow = linear_flow.Flow('wait for vm to become active',
@ -125,11 +125,11 @@ class GetVmStatusTests(base.FunctionalTestCase):
failure.
"""
# configure custom vm_status list
nova.VmStatusDetails.set_vm_status(['BUILDING',
'BUILDING',
'BUILDING',
'BUILDING',
'BUILDING',
nova.VmStatusDetails.set_vm_status(['BUILD',
'BUILD',
'BUILD',
'BUILD',
'BUILD',
'ERROR',
'ERROR'])

View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cue.common import exception as cue_exceptions
import cue.taskflow.task.check_for_vm_status as check_for_vm_status
from cue.tests.unit import base
class TaskflowCheckVmStatusTest(base.UnitTestCase):
def setUp(self):
super(TaskflowCheckVmStatusTest, self).setUp()
def tearDown(self):
super(TaskflowCheckVmStatusTest, self).tearDown()
@mock.patch('time.sleep')
def test_check_vm_status_ms(self, mock_sleep):
check_task = check_for_vm_status.CheckForVmStatus(
name="check vm status",
details="waiting for ACTIVE VM status",
retry_delay_ms=1000,
)
check_task.revert('BUILD')
self.assertEqual(1.0, mock_sleep.call_args[0][0])
def test_check_vm_status_no_details(self):
check_task = check_for_vm_status.CheckForVmStatus(
name="check vm status",
)
try:
check_task.execute('BUILD')
except cue_exceptions.VmBuildingException as err:
self.assertEqual(cue_exceptions.VmBuildingException.message,
err.message)
else:
self.fail("Expected VmBuildingException")

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.retry as tf_retry
from cue.common import exception as cue_exceptions
import cue.taskflow.retry.exception_times as retry
from cue.tests.unit import base
class TaskflowExceptionTimesTest(base.UnitTestCase):
def setUp(self):
super(TaskflowExceptionTimesTest, self).setUp()
def tearDown(self):
super(TaskflowExceptionTimesTest, self).tearDown()
def test_revert_action(self):
retry_exception_times = retry.ExceptionTimes(
revert_exception_list=[cue_exceptions.VmErrorException],
attempts=10,
revert_all=False)
self.assertEqual(tf_retry.REVERT, retry_exception_times._revert_action)
def test_revert_action_empty_exception_list(self):
retry_exception_times = retry.ExceptionTimes(
revert_exception_list=None,
attempts=10,
revert_all=False)
self.assertEqual([], retry_exception_times._revert_exception_list)

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from os_tasklib.common.assert_task import Assert # noqa
from os_tasklib.common.check_for import CheckFor # noqa
from os_tasklib.common.lambda_task import Lambda # noqa
from os_tasklib.common.map_task import Map # noqa
from os_tasklib.common.reduce_task import Reduce # noqa
from os_tasklib.common.assert_task import Assert # noqa
from os_tasklib.common.check_for import CheckFor # noqa
from os_tasklib.common.lambda_task import Lambda # noqa
from os_tasklib.common.map_task import Map # noqa
from os_tasklib.common.reduce_task import Reduce # noqa