Do not use VolumeTasks in AWS Instance create

Use volume progress objects instead (as in volume attachment resources).

Module volume_tasks and class scheduler.PollingTaskGroup have been removed,
as they are no longer used anywhere.

Change-Id: Iff3b92f28d89ad1fdfd22511bb531be463855ccd
Partial-Bug: #1393268
This commit is contained in:
Pavlo Shchelokovskyy 2015-07-13 21:30:28 +03:00 committed by Steve Baker
parent 4d910cbaa2
commit 4f9371cfc0
5 changed files with 78 additions and 333 deletions

View File

@ -21,12 +21,12 @@ from heat.common import exception
from heat.common.i18n import _
from heat.common.i18n import _LI
from heat.engine import attributes
from heat.engine.clients.os import cinder as cinder_cp
from heat.engine.clients.os import nova as nova_cp
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine import scheduler
from heat.engine import volume_tasks as vol_task
cfg.CONF.import_opt('instance_user', 'heat.common.config')
cfg.CONF.import_opt('stack_scheduler_hints', 'heat.common.config')
@ -577,23 +577,15 @@ class Instance(resource.Resource):
if server is not None:
self.resource_id_set(server.id)
if self.volumes():
attacher = scheduler.TaskRunner(self._attach_volumes_task())
else:
attacher = None
creator = nova_cp.ServerCreateProgress(server.id)
return creator, attacher
def _attach_volumes_task(self):
attach_tasks = (vol_task.VolumeAttachTask(self.stack,
self.resource_id,
volume_id,
device)
for volume_id, device in self.volumes())
return scheduler.PollingTaskGroup(attach_tasks)
attachers = []
for vol_id, device in self.volumes():
attachers.append(cinder_cp.VolumeAttachProgress(self.resource_id,
vol_id, device))
return creator, tuple(attachers)
def check_create_complete(self, cookie):
creator, attacher = cookie
creator, attachers = cookie
if not creator.complete:
creator.complete = self.client_plugin()._check_active(
@ -601,17 +593,30 @@ class Instance(resource.Resource):
if creator.complete:
server = self.client_plugin().get_server(creator.server_id)
self._set_ipaddress(server.networks)
return attacher is None
# NOTE(pas-ha) small optimization,
# return True if there are no volumes to attach
# to save one check_create_complete call
return not len(attachers)
else:
return False
return self._check_volume_attached(attacher)
return self._attach_volumes(attachers)
def _check_volume_attached(self, volume_attach_task):
if not volume_attach_task.started():
volume_attach_task.start()
return volume_attach_task.done()
else:
return volume_attach_task.step()
def _attach_volumes(self, attachers):
for attacher in attachers:
if not attacher.called:
self.client_plugin().attach_volume(attacher.srv_id,
attacher.vol_id,
attacher.device)
attacher.called = True
return False
for attacher in attachers:
if not attacher.complete:
attacher.complete = self.client_plugin(
'cinder').check_attach_volume_complete(attacher.vol_id)
break
out = all(attacher.complete for attacher in attachers)
return out
def volumes(self):
"""

View File

@ -11,14 +11,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import sys
import types
import eventlet
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
from six import reraise as raise_
@ -440,100 +437,3 @@ class DependencyTaskGroup(object):
"""
running = lambda k_r: k_r[0] in self._graph and k_r[1].started()
return six.moves.filter(running, six.iteritems(self._runners))
class PollingTaskGroup(object):
"""
A task which manages a group of subtasks.
When the task is started, all of its subtasks are also started. The task
completes when all subtasks are complete.
Once started, the subtasks are assumed to be only polling for completion
of an asynchronous operation, so no attempt is made to give them equal
scheduling slots.
"""
def __init__(self, tasks, name=None):
"""Initialise with a list of tasks."""
self._tasks = list(tasks)
if name is None:
name = ', '.join(task_description(t) for t in self._tasks)
self.name = name
@staticmethod
def _args(arg_lists):
"""Return a list containing the positional args for each subtask."""
return zip(*arg_lists)
@staticmethod
def _kwargs(kwarg_lists):
"""Return a list containing the keyword args for each subtask."""
keygroups = (six.moves.zip(itertools.repeat(name),
arglist)
for name, arglist in six.iteritems(kwarg_lists))
return [dict(kwargs) for kwargs in six.moves.zip(*keygroups)]
@classmethod
def from_task_with_args(cls, task, *arg_lists, **kwarg_lists):
"""
Return a new PollingTaskGroup where each subtask is identical except
for the arguments passed to it.
Each argument to use should be passed as a list (or iterable) of values
such that one is passed in the corresponding position for each subtask.
The number of subtasks spawned depends on the length of the argument
lists.
For example::
PollingTaskGroup.from_task_with_args(my_task,
[1, 2, 3],
alpha=['a', 'b', 'c'])
will start three TaskRunners that will run::
my_task(1, alpha='a')
my_task(2, alpha='b')
my_task(3, alpha='c')
respectively.
If multiple arguments are supplied, each list should be of the same
length. In the case of any discrepancy, the length of the shortest
argument list will be used, and any extra arguments discarded.
"""
args_list = cls._args(arg_lists)
kwargs_list = cls._kwargs(kwarg_lists)
if kwarg_lists and not arg_lists:
args_list = [[]] * len(kwargs_list)
elif arg_lists and not kwarg_lists:
kwargs_list = [{}] * len(args_list)
task_args = six.moves.zip(args_list, kwargs_list)
tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args)
return cls(tasks, name=task_description(task))
def __repr__(self):
"""Return a string representation of the task group."""
text = '%s(%s)' % (type(self).__name__, self.name)
return encodeutils.safe_encode(text)
def __call__(self):
"""Return a co-routine which runs the task group."""
runners = [TaskRunner(t) for t in self._tasks]
try:
for r in runners:
r.start()
while runners:
yield
runners = list(itertools.dropwhile(lambda r: r.step(),
runners))
except: # noqa
with excutils.save_and_reraise_exception():
for r in runners:
r.cancel()

View File

@ -1,79 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from heat.common.i18n import _
from heat.common.i18n import _LI
from heat.engine import resource
LOG = logging.getLogger(__name__)
class VolumeAttachTask(object):
"""A task for attaching a volume to a Nova server."""
def __init__(self, stack, server_id, volume_id, device):
"""
Initialise with the stack (for obtaining the clients), ID of the
server and volume, and the device name on the server.
"""
self.clients = stack.clients
self.server_id = server_id
self.volume_id = volume_id
self.device = device
self.attachment_id = None
def __str__(self):
"""Return a human-readable string description of the task."""
return 'Attaching Volume %s to Instance %s as %s' % (self.volume_id,
self.server_id,
self.device)
def __repr__(self):
"""Return a brief string description of the task."""
return '%s(%s -> %s [%s])' % (type(self).__name__,
self.volume_id,
self.server_id,
self.device)
def __call__(self):
"""Return a co-routine which runs the task."""
LOG.debug(str(self))
va = self.clients.client('nova').volumes.create_server_volume(
server_id=self.server_id,
volume_id=self.volume_id,
device=self.device)
self.attachment_id = va.id
yield
cinder = self.clients.client('cinder')
vol = cinder.volumes.get(self.volume_id)
while vol.status == 'available' or vol.status == 'attaching':
LOG.debug('%(name)s - volume status: %(status)s'
% {'name': str(self), 'status': vol.status})
yield
vol = cinder.volumes.get(self.volume_id)
if vol.status != 'in-use':
LOG.info(_LI("Attachment failed - volume %(vol)s "
"is in %(status)s status"),
{"vol": vol.id,
"status": vol.status})
raise resource.ResourceUnknownStatus(
resource_status=vol.status,
result=_('Volume attachment failed'))
LOG.info(_LI('%s - complete'), str(self))

View File

@ -65,6 +65,15 @@ wp_template = '''
"DeviceName": "vdb",
"Ebs": {"SnapshotId": "9ef5496e-7426-446a-bbc8-01f84d9c9972",
"DeleteOnTermination": "True"}
}],
"Volumes" : [
{
"Device": "/dev/vdc",
"VolumeId": "cccc"
},
{
"Device": "/dev/vdd",
"VolumeId": "dddd"
}]
}
}
@ -95,20 +104,24 @@ class InstancesTest(common.HeatTestCase):
self.m.StubOutWithMock(glance.GlanceClientPlugin, 'get_image_id')
glance.GlanceClientPlugin.get_image_id(image_id).AndRaise(exp)
def _get_test_template(self, stack_name, image_id=None):
def _get_test_template(self, stack_name, image_id=None, volumes=False):
(tmpl, stack) = self._setup_test_stack(stack_name)
tmpl.t['Resources']['WebServer']['Properties'][
'ImageId'] = image_id or 'CentOS 5.2'
tmpl.t['Resources']['WebServer']['Properties'][
'InstanceType'] = '256 MB Server'
if not volumes:
tmpl.t['Resources']['WebServer']['Properties']['Volumes'] = []
return tmpl, stack
def _setup_test_instance(self, return_server, name, image_id=None,
stub_create=True, stub_complete=False):
stub_create=True, stub_complete=False,
volumes=False):
stack_name = '%s_s' % name
tmpl, self.stack = self._get_test_template(stack_name, image_id)
tmpl, self.stack = self._get_test_template(stack_name, image_id,
volumes=volumes)
resource_defns = tmpl.resource_definitions(self.stack)
instance = instances.Instance(name, resource_defns['WebServer'],
self.stack)
@ -262,6 +275,7 @@ class InstancesTest(common.HeatTestCase):
self._mock_get_image_id_success('F17-x86_64-gold', 1)
self.stub_SnapshotConstraint_validate()
self.stub_VolumeConstraint_validate()
self.m.StubOutWithMock(nova.NovaClientPlugin, '_create')
nova.NovaClientPlugin._create().MultipleTimes().AndReturn(self.fc)
@ -277,6 +291,7 @@ class InstancesTest(common.HeatTestCase):
bdm = [{'DeviceName': 'vdb'}]
wsp = tmpl.t['Resources']['WebServer']['Properties']
wsp['BlockDeviceMappings'] = bdm
wsp['Volumes'] = []
resource_defns = tmpl.resource_definitions(stack)
instance = instances.Instance('validate_without_Ebs',
resource_defns['WebServer'], stack)
@ -301,6 +316,7 @@ class InstancesTest(common.HeatTestCase):
'Ebs': {'VolumeSize': '1'}}]
wsp = tmpl.t['Resources']['WebServer']['Properties']
wsp['BlockDeviceMappings'] = bdm
wsp['Volumes'] = []
resource_defns = tmpl.resource_definitions(stack)
instance = instances.Instance('validate_without_SnapshotId',
resource_defns['WebServer'], stack)
@ -603,6 +619,7 @@ class InstancesTest(common.HeatTestCase):
nova.NovaClientPlugin._create().AndReturn(self.fc)
self._mock_get_image_id_success('1', 1)
self.stub_VolumeConstraint_validate()
self.stub_SnapshotConstraint_validate()
self.m.ReplayAll()
@ -1049,6 +1066,7 @@ class InstancesTest(common.HeatTestCase):
scheduler.TaskRunner(instance.create)()
self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state)
self.m.VerifyAll()
def _test_instance_status_suspend(self, name,
state=('CREATE', 'COMPLETE')):
@ -1444,3 +1462,32 @@ class InstancesTest(common.HeatTestCase):
self.m.ReplayAll()
scheduler.TaskRunner(instance.create)()
self.m.VerifyAll()
def test_instance_create_with_volumes(self):
return_server = self.fc.servers.list()[1]
self.stub_VolumeConstraint_validate()
instance = self._setup_test_instance(return_server,
'with_volumes',
stub_complete=True,
volumes=True)
attach_mock = self.patchobject(nova.NovaClientPlugin, 'attach_volume',
side_effect=['cccc', 'dddd'])
check_attach_mock = self.patchobject(cinder.CinderClientPlugin,
'check_attach_volume_complete',
side_effect=[False, True,
False, True])
self.m.ReplayAll()
scheduler.TaskRunner(instance.create)()
self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state)
self.assertEqual(2, attach_mock.call_count)
attach_mock.assert_has_calls([mock.call(instance.resource_id,
'cccc', '/dev/vdc'),
mock.call(instance.resource_id,
'dddd', '/dev/vdd')])
self.assertEqual(4, check_attach_mock.call_count)
check_attach_mock.assert_has_calls([mock.call('cccc'),
mock.call('cccc'),
mock.call('dddd'),
mock.call('dddd')])
self.m.VerifyAll()

View File

@ -34,134 +34,6 @@ class DummyTask(object):
pass
class PollingTaskGroupTest(common.HeatTestCase):
def setUp(self):
super(PollingTaskGroupTest, self).setUp()
self.addCleanup(self.m.VerifyAll)
def test_group(self):
tasks = [DummyTask() for i in range(3)]
for t in tasks:
self.m.StubOutWithMock(t, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
scheduler.TaskRunner._sleep(0).AndReturn(None)
for t in tasks:
t.do_step(1).AndReturn(None)
for t in tasks:
scheduler.TaskRunner._sleep(1).AndReturn(None)
t.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
t.do_step(3).AndReturn(None)
self.m.ReplayAll()
tg = scheduler.PollingTaskGroup(tasks)
scheduler.TaskRunner(tg)()
def test_kwargs(self):
input_kwargs = {'i': [0, 1, 2],
'i2': [0, 1, 4]}
output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
expected_kwargs = [{'i': 0, 'i2': 0},
{'i': 1, 'i2': 1},
{'i': 2, 'i2': 4}]
self.assertEqual(expected_kwargs, list(output_kwargs))
def test_kwargs_short(self):
input_kwargs = {'i': [0, 1, 2],
'i2': [0]}
output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
expected_kwargs = [{'i': 0, 'i2': 0}]
self.assertEqual(expected_kwargs, list(output_kwargs))
def test_no_kwargs(self):
output_kwargs = scheduler.PollingTaskGroup._kwargs({})
self.assertEqual([], list(output_kwargs))
def test_args(self):
input_args = ([0, 1, 2],
[0, 1, 4])
output_args = scheduler.PollingTaskGroup._args(input_args)
expected_args = [(0, 0), (1, 1), (2, 4)]
self.assertEqual(expected_args, list(output_args))
def test_args_short(self):
input_args = ([0, 1, 2],
[0])
output_args = scheduler.PollingTaskGroup._args(input_args)
expected_args = [(0, 0)]
self.assertEqual(expected_args, list(output_args))
def test_no_args(self):
output_args = scheduler.PollingTaskGroup._args([])
self.assertEqual([], list(output_args))
@contextlib.contextmanager
def _args_test(self, *arg_lists, **kwarg_lists):
dummy = DummyTask(1)
tg = scheduler.PollingTaskGroup.from_task_with_args(dummy,
*arg_lists,
**kwarg_lists)
self.m.StubOutWithMock(dummy, 'do_step')
yield dummy
self.m.ReplayAll()
scheduler.TaskRunner(tg)(wait_time=None)
def test_with_all_args(self):
with self._args_test([0, 1, 2], [0, 1, 8],
i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
for i in range(3):
dummy.do_step(1, i, i * i * i, i=i, i2=i * i)
def test_with_short_args(self):
with self._args_test([0, 1, 2], [0, 1],
i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
for i in range(2):
dummy.do_step(1, i, i * i, i=i, i2=i * i)
def test_with_short_kwargs(self):
with self._args_test([0, 1, 2], [0, 1, 8],
i=[0, 1], i2=[0, 1, 4]) as dummy:
for i in range(2):
dummy.do_step(1, i, i * i, i=i, i2=i * i)
def test_with_empty_args(self):
with self._args_test([], i=[0, 1, 2], i2=[0, 1, 4]):
pass
def test_with_empty_kwargs(self):
with self._args_test([0, 1, 2], [0, 1, 8], i=[]):
pass
def test_with_no_args(self):
with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
for i in range(3):
dummy.do_step(1, i=i, i2=i * i)
def test_with_no_kwargs(self):
with self._args_test([0, 1, 2], [0, 1, 4]) as dummy:
for i in range(3):
dummy.do_step(1, i, i * i)
class ExceptionGroupTest(common.HeatTestCase):
def test_contains_exceptions(self):