Move Engine initialization into service start()

Currently we create the ThreadGroupManager and EngineListener
objects in the service constructor, which is not necessarily
going to work if multiple worker processes are specified in
the config file (which fork multiple workers after the constructor).

The ThreadGroupManager appears to work, even when it is created before
the fork, but this is due to some magic in the oslo ProcessLauncher
implementation which decouples parent/child use of the eventlet hub.

So instead, we move all service startup code into the start() method,
which is the entry point for services, triggered via the oslo Services
class run_service method:

- Don't create anything common to the workers in the constructor
- Move ThreadGroup and EngineListener creation into start()
- Create the periodic tasks from bin/heat-engine, which means
  the periodic tasks will only be created by the parent, not
  duplicated in every worker process.

These changes should mean we work correctly with both the ServiceLauncher
(num_engine_workers==1) and ProcessLauncher(num_engine_workers>1) oslo
abstractions, and solves the issues observed when running multiple
workers with the impl_qpid rpc_backend.

Change-Id: If3a11050a03660560a364dec871f85c4b56c1c25
Closes-Bug: #1321303
This commit is contained in:
Steven Hardy 2014-05-27 16:18:06 +01:00
parent 61bd5f30f2
commit 507555a585
5 changed files with 50 additions and 201 deletions

View File

@ -60,5 +60,8 @@ if __name__ == '__main__':
srv = engine.EngineService(cfg.CONF.host, rpc_api.ENGINE_TOPIC)
launcher = service.launch(srv, workers=cfg.CONF.num_engine_workers)
# We create the periodic tasks here, which mean they are created
# only in the parent process when num_engine_workers>1 is specified
srv.create_periodic_tasks()
notify.startup_notify(cfg.CONF.onready)
launcher.wait()

View File

@ -13,6 +13,7 @@
import functools
import json
import os
from oslo.config import cfg
import six
@ -101,7 +102,7 @@ class ThreadGroupManager(object):
:param cnxt: RPC context
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param engine_id: The UUID of the engine acquiring the lock
:param engine_id: The UUID of the engine/worker acquiring the lock
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
@ -278,17 +279,22 @@ class EngineService(service.Service):
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__(host, topic)
resources.initialise()
self.host = host
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.stack_watch = None
self.listener = None
self.engine_id = None
self.thread_group_mgr = None
def create_periodic_tasks(self):
LOG.debug("Starting periodic watch tasks pid=%s" % os.getpid())
# Note with multiple workers, the parent process hasn't called start()
# so we need to create a ThreadGroupManager here for the periodic tasks
if self.thread_group_mgr is None:
self.thread_group_mgr = ThreadGroupManager()
self.stack_watch = StackWatch(self.thread_group_mgr)
self.listener = EngineListener(host, self.engine_id,
self.thread_group_mgr)
LOG.debug("Starting listener for engine %s" % self.engine_id)
self.listener.start()
def start(self):
super(EngineService, self).start()
# Create a periodic_watcher_task per-stack
admin_context = context.get_admin_context()
@ -296,6 +302,16 @@ class EngineService(service.Service):
for s in stacks:
self.stack_watch.start_watch_task(s.id, admin_context)
def start(self):
self.thread_group_mgr = ThreadGroupManager()
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.listener = EngineListener(self.host, self.engine_id,
self.thread_group_mgr)
LOG.debug("Starting listener for engine %s pid=%s, ppid=%s" %
(self.engine_id, os.getpid(), os.getppid()))
self.listener.start()
super(EngineService, self).start()
def stop(self):
# Stop rpc connection at first for preventing new requests
LOG.info(_("Attempting to stop engine service..."))
@ -524,8 +540,9 @@ class EngineService(service.Service):
if (stack.action in (stack.CREATE, stack.ADOPT)
and stack.status == stack.COMPLETE):
# Schedule a periodic watcher task for this stack
self.stack_watch.start_watch_task(stack.id, cnxt)
if self.stack_watch:
# Schedule a periodic watcher task for this stack
self.stack_watch.start_watch_task(stack.id, cnxt)
else:
LOG.warning(_("Stack create failed, status %s") % stack.status)

View File

@ -309,6 +309,9 @@ class DummyThreadGroup(object):
*args, **kwargs):
self.threads.append(callback)
def stop_timers(self):
pass
def add_thread(self, callback, *args, **kwargs):
self.threads.append(callback)
return self.pool.spawn(callback, *args, **kwargs)
@ -419,11 +422,8 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
def setUp(self):
super(StackServiceCreateUpdateDeleteTest, self).setUp()
self.ctx = utils.dummy_context()
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.man = service.EngineService('a-host', 'a-topic')
self.man.create_periodic_tasks()
def _test_stack_create(self, stack_name):
params = {'foo': 'bar'}
@ -1232,10 +1232,6 @@ class StackServiceUpdateNotSupportedTest(HeatTestCase):
def setUp(self):
super(StackServiceUpdateNotSupportedTest, self).setUp()
self.ctx = utils.dummy_context()
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.man = service.EngineService('a-host', 'a-topic')
def test_stack_update_during(self):
@ -1269,11 +1265,8 @@ class StackServiceSuspendResumeTest(HeatTestCase):
def setUp(self):
super(StackServiceSuspendResumeTest, self).setUp()
self.ctx = utils.dummy_context()
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.man = service.EngineService('a-host', 'a-topic')
self.man.create_periodic_tasks()
def test_stack_suspend(self):
stack_name = 'service_suspend_test_stack'
@ -1345,11 +1338,8 @@ class StackServiceAuthorizeTest(HeatTestCase):
super(StackServiceAuthorizeTest, self).setUp()
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.eng = service.EngineService('a-host', 'a-topic')
self.eng.engine_id = 'engine-fake-uuid'
cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role')
res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps)
@ -1442,36 +1432,29 @@ class StackServiceTest(HeatTestCase):
super(StackServiceTest, self).setUp()
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.eng = service.EngineService('a-host', 'a-topic')
self.eng.create_periodic_tasks()
self.eng.engine_id = 'engine-fake-uuid'
cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role')
res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps)
@mock.patch.object(service.StackWatch, 'start_watch_task')
@mock.patch.object(service.db_api, 'stack_get_all')
@mock.patch.object(service.service.Service, 'start')
def test_start_gets_all_stacks(self, mock_super_start, mock_stack_get_all):
mock_stack_get_all.return_value = []
self.eng.start()
mock_stack_get_all.assert_called_once_with(mock.ANY, tenant_safe=False)
@mock.patch.object(service.db_api, 'stack_get_all')
@mock.patch.object(service.service.Service, 'start')
def test_start_watches_all_stacks(self, mock_super_start, mock_get_all):
def test_start_watches_all_stacks(self, mock_super_start, mock_get_all,
start_watch_task):
s1 = mock.Mock(id=1)
s2 = mock.Mock(id=2)
mock_get_all.return_value = [s1, s2]
mock_watch = mock.Mock()
self.eng.stack_watch.start_watch_task = mock_watch
start_watch_task.return_value = None
self.eng.start()
calls = mock_watch.call_args_list
self.assertEqual(2, mock_watch.call_count)
self.eng.thread_group_mgr = None
self.eng.create_periodic_tasks()
mock_get_all.assert_called_once_with(mock.ANY, tenant_safe=False)
calls = start_watch_task.call_args_list
self.assertEqual(2, start_watch_task.call_count)
self.assertIn(mock.call(1, mock.ANY), calls)
self.assertIn(mock.call(2, mock.ANY), calls)
@ -2633,10 +2616,6 @@ class SoftwareConfigServiceTest(HeatTestCase):
def setUp(self):
super(SoftwareConfigServiceTest, self).setUp()
self.ctx = utils.dummy_context()
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.engine = service.EngineService('a-host', 'a-topic')
def _create_software_config(

View File

@ -199,10 +199,6 @@ class WaitCondMetadataUpdateTest(HeatTestCase):
def setUp(self):
super(WaitCondMetadataUpdateTest, self).setUp()
self.fc = fakes.FakeKeystoneClient()
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
self.man = service.EngineService('a-host', 'a-topic')
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://server.test:8000/v1/waitcondition')

View File

@ -817,31 +817,19 @@ class validateTest(HeatTestCase):
def test_validate_ref_valid(self):
t = template_format.parse(test_template_ref % 'WikiDatabase')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual('test.', res['Description'])
self.m.VerifyAll()
def test_validate_with_environment(self):
test_template = test_template_ref % 'WikiDatabase'
test_template = test_template.replace('AWS::EC2::Instance',
'My::Instance')
t = template_format.parse(test_template)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
params = {'resource_registry': {'My::Instance': 'AWS::EC2::Instance'}}
res = dict(engine.validate_template(None, t, params))
self.assertEqual('test.', res['Description'])
self.m.VerifyAll()
def test_validate_hot_valid(self):
t = template_format.parse(
@ -852,58 +840,30 @@ class validateTest(HeatTestCase):
my_instance:
type: AWS::EC2::Instance
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual('test.', res['Description'])
self.m.VerifyAll()
def test_validate_ref_invalid(self):
t = template_format.parse(test_template_ref % 'WikiDatabasez')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertNotEqual(res['Description'], 'Successfully validated')
self.m.VerifyAll()
def test_validate_findinmap_valid(self):
t = template_format.parse(test_template_findinmap_valid)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual('test.', res['Description'])
self.m.VerifyAll()
def test_validate_findinmap_invalid(self):
t = template_format.parse(test_template_findinmap_invalid)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertNotEqual(res['Description'], 'Successfully validated')
self.m.VerifyAll()
def test_validate_parameters(self):
t = template_format.parse(test_template_ref % 'WikiDatabase')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
# Note: the assertion below does not expect a CFN dict of the parameter
@ -917,7 +877,6 @@ class validateTest(HeatTestCase):
'NoEcho': 'false',
'Label': 'KeyName'}}
self.assertEqual(expected, res['Parameters'])
self.m.VerifyAll()
def test_validate_hot_empty_parameters_valid(self):
t = template_format.parse(
@ -929,21 +888,12 @@ class validateTest(HeatTestCase):
my_instance:
type: AWS::EC2::Instance
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual({}, res['Parameters'])
self.m.VerifyAll()
def test_validate_hot_parameter_label(self):
t = template_format.parse(test_template_hot_parameter_label)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
parameters = res['Parameters']
@ -955,14 +905,9 @@ class validateTest(HeatTestCase):
'NoEcho': 'false',
'Label': 'Nova KeyPair Name'}}
self.assertEqual(expected, parameters)
self.m.VerifyAll()
def test_validate_hot_no_parameter_label(self):
t = template_format.parse(test_template_hot_no_parameter_label)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
parameters = res['Parameters']
@ -974,14 +919,9 @@ class validateTest(HeatTestCase):
'NoEcho': 'false',
'Label': 'KeyName'}}
self.assertEqual(expected, parameters)
self.m.VerifyAll()
def test_validate_cfn_parameter_label(self):
t = template_format.parse(test_template_cfn_parameter_label)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
parameters = res['Parameters']
@ -993,7 +933,6 @@ class validateTest(HeatTestCase):
'NoEcho': 'false',
'Label': 'Nova KeyPair Name'}}
self.assertEqual(expected, parameters)
self.m.VerifyAll()
def test_validate_hot_empty_resources_valid(self):
t = template_format.parse(
@ -1002,16 +941,11 @@ class validateTest(HeatTestCase):
description: test.
resources:
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
expected = {"Description": "test.",
"Parameters": {}}
self.assertEqual(expected, res)
self.m.VerifyAll()
def test_validate_hot_empty_outputs_valid(self):
t = template_format.parse(
@ -1020,40 +954,25 @@ class validateTest(HeatTestCase):
description: test.
outputs:
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
expected = {"Description": "test.",
"Parameters": {}}
self.assertEqual(expected, res)
self.m.VerifyAll()
def test_validate_properties(self):
t = template_format.parse(test_template_invalid_property)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual({'Error': 'Unknown Property UnknownProperty'}, res)
self.m.VerifyAll()
def test_invalid_resources(self):
t = template_format.parse(test_template_invalid_resources)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual({'Error': 'Resources must contain Resource. '
'Found a [string] instead'},
res)
self.m.VerifyAll()
def test_invalid_section_cfn(self):
t = template_format.parse(
@ -1069,15 +988,10 @@ class validateTest(HeatTestCase):
}
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t))
self.assertEqual({'Error': 'The template section is invalid: Output'},
res)
self.m.VerifyAll()
def test_invalid_section_hot(self):
t = template_format.parse(
@ -1089,79 +1003,49 @@ class validateTest(HeatTestCase):
output:
""")
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t))
self.assertEqual({'Error': 'The template section is invalid: output'},
res)
self.m.VerifyAll()
def test_unimplemented_property(self):
t = template_format.parse(test_template_unimplemented_property)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual(
{'Error': 'Property SourceDestCheck not implemented yet'},
res)
self.m.VerifyAll()
def test_invalid_deletion_policy(self):
t = template_format.parse(test_template_invalid_deletion_policy)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual({'Error': 'Invalid DeletionPolicy Destroy'}, res)
self.m.VerifyAll()
def test_snapshot_deletion_policy(self):
t = template_format.parse(test_template_snapshot_deletion_policy)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual(
{'Error': 'Snapshot DeletionPolicy not supported'}, res)
self.m.VerifyAll()
@skipIf(try_import('cinderclient.v1.volume_backups') is None,
'unable to import volume_backups')
def test_volume_snapshot_deletion_policy(self):
t = template_format.parse(test_template_volume_snapshot)
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, t, {}))
self.assertEqual({'Description': u'test.', 'Parameters': {}}, res)
self.m.VerifyAll()
def test_validate_template_without_resources(self):
hot_tpl = template_format.parse('''
heat_template_version: 2013-05-23
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
expected = {'Description': 'No description', 'Parameters': {}}
self.assertEqual(expected, res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_type(self):
hot_tpl = template_format.parse('''
@ -1179,15 +1063,10 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"Type" is not a valid keyword '
'inside a resource definition\''}, res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_properties(self):
hot_tpl = template_format.parse('''
@ -1205,15 +1084,10 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"Properties" is not a valid keyword '
'inside a resource definition\''}, res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_matadata(self):
hot_tpl = template_format.parse('''
@ -1231,15 +1105,10 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"Metadata" is not a valid keyword '
'inside a resource definition\''}, res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_depends_on(self):
hot_tpl = template_format.parse('''
@ -1257,15 +1126,10 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"DependsOn" is not a valid keyword '
'inside a resource definition\''}, res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_deletion_polciy(self):
hot_tpl = template_format.parse('''
@ -1283,16 +1147,11 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"DeletionPolicy" is not a valid '
'keyword inside a resource definition\''},
res)
self.m.VerifyAll()
def test_validate_template_with_invalid_resource_update_policy(self):
hot_tpl = template_format.parse('''
@ -1310,16 +1169,11 @@ class validateTest(HeatTestCase):
foo: bar
''')
self.m.StubOutWithMock(service.EngineListener, 'start')
service.EngineListener.start().AndReturn(None)
self.m.ReplayAll()
engine = service.EngineService('a', 't')
res = dict(engine.validate_template(None, hot_tpl, {}))
self.assertEqual({'Error': 'u\'"UpdatePolicy" is not a valid '
'keyword inside a resource definition\''},
res)
self.m.VerifyAll()
def test_unregistered_key(self):
t = template_format.parse(test_unregistered_key)