Allow resources to be updated in parallel

Turn Resource.update() into a coroutine.

Change-Id: I625a2d81095cc28161f8e2342121bfe1939611e3
This commit is contained in:
Zane Bitter 2013-08-30 20:45:54 +02:00
parent 4da718ce54
commit 5532be0ecf
12 changed files with 71 additions and 61 deletions

View File

@ -420,7 +420,11 @@ class Resource(object):
tmpl_diff = self.update_template_diff(after, before)
prop_diff = self.update_template_diff_properties(after, before)
if callable(getattr(self, 'handle_update', None)):
result = self.handle_update(after, tmpl_diff, prop_diff)
handle_data = self.handle_update(after, tmpl_diff, prop_diff)
yield
if callable(getattr(self, 'check_update_complete', None)):
while not self.check_update_complete(handle_data):
yield
except UpdateReplace:
logger.debug("Resource %s update requires replacement" % self.name)
raise

View File

@ -23,6 +23,7 @@ from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
from heat.engine.properties import Properties
from heat.engine import properties
from heat.engine import scheduler
from heat.engine import stack_resource
logger = logging.getLogger(__name__)
@ -243,7 +244,7 @@ class InstanceGroup(stack_resource.StackResource):
(lb,))
resolved_snippet = self.stack.resolve_static_data(
lb_resource.json_snippet)
lb_resource.update(resolved_snippet)
scheduler.TaskRunner(lb_resource.update, resolved_snippet)()
def FnGetRefId(self):
return unicode(self.name)

View File

@ -213,7 +213,7 @@ class AutoScalingTest(HeatTestCase):
# Reduce the min size to 0, should complete without adjusting
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['MinSize'] = '0'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual(['WebServerGroup-0'], rsrc.get_instance_names())
# trigger adjustment to reduce to 0, there should be no more instances
@ -241,8 +241,8 @@ class AutoScalingTest(HeatTestCase):
self.assertEqual(['WebServerGroup-0'], rsrc.get_instance_names())
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['AvailabilityZones'] = ['foo']
self.assertRaises(resource.UpdateReplace,
rsrc.update, update_snippet)
updater = scheduler.TaskRunner(rsrc.update, update_snippet)
self.assertRaises(resource.UpdateReplace, updater)
rsrc.delete()
self.m.VerifyAll()
@ -503,7 +503,7 @@ class AutoScalingTest(HeatTestCase):
# Reduce the max size to 2, should complete without adjusting
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['MaxSize'] = '2'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual(['WebServerGroup-0'], rsrc.get_instance_names())
self.assertEqual('2', rsrc.properties['MaxSize'])
@ -534,7 +534,7 @@ class AutoScalingTest(HeatTestCase):
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['MinSize'] = '2'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual(['WebServerGroup-0', 'WebServerGroup-1'],
rsrc.get_instance_names())
self.assertEqual('2', rsrc.properties['MinSize'])
@ -565,7 +565,7 @@ class AutoScalingTest(HeatTestCase):
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['DesiredCapacity'] = '2'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual(['WebServerGroup-0', 'WebServerGroup-1'],
rsrc.get_instance_names())
@ -593,7 +593,7 @@ class AutoScalingTest(HeatTestCase):
# have no effect, it's an optional parameter
update_snippet = copy.deepcopy(rsrc.parsed_template())
del(update_snippet['Properties']['DesiredCapacity'])
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual(['WebServerGroup-0', 'WebServerGroup-1'],
rsrc.get_instance_names())
@ -619,7 +619,7 @@ class AutoScalingTest(HeatTestCase):
self.assertEqual(['WebServerGroup-0'], rsrc.get_instance_names())
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['Cooldown'] = '61'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
self.assertEqual('61', rsrc.properties['Cooldown'])
rsrc.delete()
@ -660,7 +660,7 @@ class AutoScalingTest(HeatTestCase):
self.assertEqual(['WebServerGroup-0'], rsrc.get_instance_names())
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['Cooldown'] = '61'
self.assertEqual(None, rsrc.update(update_snippet))
scheduler.TaskRunner(rsrc.update, update_snippet)()
rsrc.delete()
self.m.VerifyAll()
@ -1380,7 +1380,7 @@ class AutoScalingTest(HeatTestCase):
# Update scaling policy
update_snippet = copy.deepcopy(up_policy.parsed_template())
update_snippet['Properties']['ScalingAdjustment'] = '2'
self.assertEqual(None, up_policy.update(update_snippet))
scheduler.TaskRunner(up_policy.update, update_snippet)()
self.assertEqual('2',
up_policy.properties['ScalingAdjustment'])

View File

@ -164,7 +164,7 @@ class CeilometerAlarmTest(HeatTestCase):
snippet['Properties']['alarm_actions'] = []
snippet['Properties']['ok_actions'] = ['signal_handler']
self.assertEqual(None, rsrc.update(snippet))
scheduler.TaskRunner(rsrc.update, snippet)()
self.m.VerifyAll()
@ -189,8 +189,8 @@ class CeilometerAlarmTest(HeatTestCase):
snippet = copy.deepcopy(rsrc.parsed_template())
snippet['Properties']['counter_name'] = 'temp'
self.assertRaises(resource.UpdateReplace,
rsrc.update, snippet)
updater = scheduler.TaskRunner(rsrc.update, snippet)
self.assertRaises(resource.UpdateReplace, updater)
self.m.VerifyAll()

View File

@ -90,7 +90,7 @@ class CloudWatchAlarmTest(HeatTestCase):
snippet['Properties']['Statistic'] = 'Maximum'
snippet['Properties']['Threshold'] = '39'
self.assertEqual(None, rsrc.update(snippet))
scheduler.TaskRunner(rsrc.update, snippet)()
rsrc.delete()
self.m.VerifyAll()
@ -116,8 +116,8 @@ class CloudWatchAlarmTest(HeatTestCase):
snippet = copy.deepcopy(rsrc.parsed_template())
snippet['Properties']['MetricName'] = 'temp'
self.assertRaises(resource.UpdateReplace,
rsrc.update, snippet)
updater = scheduler.TaskRunner(rsrc.update, snippet)
self.assertRaises(resource.UpdateReplace, updater)
rsrc.delete()
self.m.VerifyAll()

View File

@ -307,7 +307,7 @@ class InstancesTest(HeatTestCase):
update_template = copy.deepcopy(instance.t)
update_template['Metadata'] = {'test': 123}
self.assertEqual(None, instance.update(update_template))
scheduler.TaskRunner(instance.update, update_template)()
self.assertEqual(instance.metadata, {'test': 123})
def test_instance_update_instance_type(self):
@ -337,7 +337,7 @@ class InstancesTest(HeatTestCase):
body={'confirmResize': None}).AndReturn((202, None))
self.m.ReplayAll()
self.assertEqual(None, instance.update(update_template))
scheduler.TaskRunner(instance.update, update_template)()
self.assertEqual(instance.state, (instance.UPDATE, instance.COMPLETE))
self.m.VerifyAll()
@ -366,8 +366,8 @@ class InstancesTest(HeatTestCase):
body={'resize': {'flavorRef': 2}}).AndReturn((202, None))
self.m.ReplayAll()
error = self.assertRaises(exception.ResourceFailure,
instance.update, update_template)
updater = scheduler.TaskRunner(instance.update, update_template)
error = self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual(
"Error: Resizing to 'm1.small' failed, status 'ACTIVE'",
str(error))
@ -381,8 +381,8 @@ class InstancesTest(HeatTestCase):
update_template = copy.deepcopy(instance.t)
update_template['Notallowed'] = {'test': 123}
self.assertRaises(resource.UpdateReplace,
instance.update, update_template)
updater = scheduler.TaskRunner(instance.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_instance_update_properties(self):
return_server = self.fc.servers.list()[1]
@ -391,8 +391,8 @@ class InstancesTest(HeatTestCase):
update_template = copy.deepcopy(instance.t)
update_template['Properties']['KeyName'] = 'mustreplace'
self.assertRaises(resource.UpdateReplace,
instance.update, update_template)
updater = scheduler.TaskRunner(instance.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_instance_status_build(self):
return_server = self.fc.servers.list()[0]

View File

@ -254,8 +254,8 @@ class InstanceGroupTest(HeatTestCase):
update_snippet['Properties']['Size'] = '2'
tmpl_diff = {'Properties': {'Size': '2'}}
prop_diff = {'Size': '2'}
self.assertRaises(exception.ResourceFailure,
rsrc.update, update_snippet)
updater = scheduler.TaskRunner(rsrc.update, update_snippet)
self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual((rsrc.UPDATE, rsrc.FAILED), rsrc.state)
@ -281,8 +281,8 @@ class InstanceGroupTest(HeatTestCase):
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Metadata'] = 'notallowedforupdate'
self.assertRaises(resource.UpdateReplace,
rsrc.update, update_snippet)
updater = scheduler.TaskRunner(rsrc.update, update_snippet)
self.assertRaises(resource.UpdateReplace, updater)
rsrc.delete()
self.m.VerifyAll()
@ -302,8 +302,8 @@ class InstanceGroupTest(HeatTestCase):
update_snippet = copy.deepcopy(rsrc.parsed_template())
update_snippet['Properties']['AvailabilityZones'] = ['wibble']
self.assertRaises(resource.UpdateReplace,
rsrc.update, update_snippet)
updater = scheduler.TaskRunner(rsrc.update, update_snippet)
self.assertRaises(resource.UpdateReplace, updater)
rsrc.delete()
self.m.VerifyAll()

View File

@ -213,7 +213,7 @@ class FirewallTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['admin_state_up'] = False
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
@ -343,7 +343,7 @@ class FirewallPolicyTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['firewall_rules'] = ['3', '4']
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
@ -475,6 +475,6 @@ class FirewallRuleTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['protocol'] = 'icmp'
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()

View File

@ -212,7 +212,7 @@ class HealthMonitorTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['delay'] = 10
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
@ -490,7 +490,7 @@ class PoolTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['admin_state_up'] = False
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
@ -533,7 +533,7 @@ class PoolTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['monitors'] = ['mon123', 'mon789']
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
@ -588,7 +588,7 @@ class LoadBalancerTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['members'] = ['5678']
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
def test_update_missing_member(self):
@ -602,7 +602,7 @@ class LoadBalancerTest(HeatTestCase):
update_template = copy.deepcopy(rsrc.t)
update_template['Properties']['members'] = []
self.assertEqual(None, rsrc.update(update_template))
scheduler.TaskRunner(rsrc.update, update_template)()
self.assertEqual((rsrc.UPDATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()

View File

@ -313,7 +313,7 @@ class RackspaceCloudServerTest(HeatTestCase):
self.m.ReplayAll()
update_template = copy.deepcopy(cs.t)
update_template['Metadata'] = {'test': 123}
self.assertEqual(None, cs.update(update_template))
scheduler.TaskRunner(cs.update, update_template)()
self.assertEqual(cs.metadata, {'test': 123})
def test_cs_update_replace(self):
@ -322,7 +322,8 @@ class RackspaceCloudServerTest(HeatTestCase):
update_template = copy.deepcopy(cs.t)
update_template['Notallowed'] = {'test': 123}
self.assertRaises(resource.UpdateReplace, cs.update, update_template)
updater = scheduler.TaskRunner(cs.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_cs_update_properties(self):
return_server = self.fc.servers.list()[1]
@ -330,8 +331,8 @@ class RackspaceCloudServerTest(HeatTestCase):
update_template = copy.deepcopy(cs.t)
update_template['Properties']['user_data'] = 'mustreplace'
self.assertRaises(resource.UpdateReplace,
cs.update, update_template)
updater = scheduler.TaskRunner(cs.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_cs_status_build(self):
return_server = self.fc.servers.list()[0]

View File

@ -338,7 +338,7 @@ class ResourceTest(HeatTestCase):
utmpl, tmpl_diff, prop_diff).AndReturn(None)
self.m.ReplayAll()
self.assertEqual(None, res.update(utmpl))
scheduler.TaskRunner(res.update, utmpl)()
self.assertEqual((res.UPDATE, res.COMPLETE), res.state)
self.m.VerifyAll()
@ -358,7 +358,8 @@ class ResourceTest(HeatTestCase):
utmpl, tmpl_diff, prop_diff).AndRaise(resource.UpdateReplace())
self.m.ReplayAll()
# should be re-raised so parser.Stack can handle replacement
self.assertRaises(resource.UpdateReplace, res.update, utmpl)
updater = scheduler.TaskRunner(res.update, utmpl)
self.assertRaises(resource.UpdateReplace, updater)
self.m.VerifyAll()
def test_update_fail_missing_req_prop(self):
@ -372,7 +373,8 @@ class ResourceTest(HeatTestCase):
utmpl = {'Type': 'GenericResourceType', 'Properties': {}}
self.assertRaises(exception.ResourceFailure, res.update, utmpl)
updater = scheduler.TaskRunner(res.update, utmpl)
self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual((res.UPDATE, res.FAILED), res.state)
def test_update_fail_prop_typo(self):
@ -385,7 +387,8 @@ class ResourceTest(HeatTestCase):
utmpl = {'Type': 'GenericResourceType', 'Properties': {'Food': 'xyz'}}
self.assertRaises(exception.ResourceFailure, res.update, utmpl)
updater = scheduler.TaskRunner(res.update, utmpl)
self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual((res.UPDATE, res.FAILED), res.state)
def test_update_not_implemented(self):
@ -403,7 +406,8 @@ class ResourceTest(HeatTestCase):
generic_rsrc.ResourceWithProps.handle_update(
utmpl, tmpl_diff, prop_diff).AndRaise(NotImplemented)
self.m.ReplayAll()
self.assertRaises(exception.ResourceFailure, res.update, utmpl)
updater = scheduler.TaskRunner(res.update, utmpl)
self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual((res.UPDATE, res.FAILED), res.state)
self.m.VerifyAll()

View File

@ -351,7 +351,7 @@ class ServersTest(HeatTestCase):
update_template = copy.deepcopy(server.t)
update_template['Metadata'] = {'test': 123}
self.assertEqual(None, server.update(update_template))
scheduler.TaskRunner(server.update, update_template)()
self.assertEqual(server.metadata, {'test': 123})
server.t['Metadata'] = {'test': 456}
@ -385,7 +385,7 @@ class ServersTest(HeatTestCase):
body={'confirmResize': None}).AndReturn((202, None))
self.m.ReplayAll()
self.assertEqual(None, server.update(update_template))
scheduler.TaskRunner(server.update, update_template)()
self.assertEqual(server.state, (server.UPDATE, server.COMPLETE))
self.m.VerifyAll()
@ -414,8 +414,8 @@ class ServersTest(HeatTestCase):
body={'resize': {'flavorRef': 2}}).AndReturn((202, None))
self.m.ReplayAll()
error = self.assertRaises(exception.ResourceFailure,
server.update, update_template)
updater = scheduler.TaskRunner(server.update, update_template)
error = self.assertRaises(exception.ResourceFailure, updater)
self.assertEqual(
"Error: Resizing to 'm1.small' failed, status 'ACTIVE'",
str(error))
@ -433,8 +433,8 @@ class ServersTest(HeatTestCase):
update_template = copy.deepcopy(server.t)
update_template['Properties']['flavor'] = 'm1.smigish'
self.assertRaises(resource.UpdateReplace,
server.update, update_template)
updater = scheduler.TaskRunner(server.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_server_update_server_flavor_policy_update(self):
stack_name = 'test_server_update_flavor_replace'
@ -449,8 +449,8 @@ class ServersTest(HeatTestCase):
# update
update_template['Properties']['flavor_update_policy'] = 'REPLACE'
update_template['Properties']['flavor'] = 'm1.smigish'
self.assertRaises(resource.UpdateReplace,
server.update, update_template)
updater = scheduler.TaskRunner(server.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_server_update_replace(self):
return_server = self.fc.servers.list()[1]
@ -459,8 +459,8 @@ class ServersTest(HeatTestCase):
update_template = copy.deepcopy(server.t)
update_template['Notallowed'] = {'test': 123}
self.assertRaises(resource.UpdateReplace,
server.update, update_template)
updater = scheduler.TaskRunner(server.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_server_update_properties(self):
return_server = self.fc.servers.list()[1]
@ -469,8 +469,8 @@ class ServersTest(HeatTestCase):
update_template = copy.deepcopy(server.t)
update_template['Properties']['key_name'] = 'mustreplace'
self.assertRaises(resource.UpdateReplace,
server.update, update_template)
updater = scheduler.TaskRunner(server.update, update_template)
self.assertRaises(resource.UpdateReplace, updater)
def test_server_status_build(self):
return_server = self.fc.servers.list()[0]