Merge "Allow an in-progress stack to be deleted"

This commit is contained in:
Jenkins 2014-02-11 11:28:04 +00:00 committed by Gerrit Code Review
commit 2a2cf5231d
5 changed files with 179 additions and 6 deletions

View File

@ -72,6 +72,7 @@ class FaultWrapper(wsgi.Middleware):
'InvalidTemplateReference': webob.exc.HTTPBadRequest,
'UnknownUserParameter': webob.exc.HTTPBadRequest,
'RevertFailed': webob.exc.HTTPInternalServerError,
'StopActionFailed': webob.exc.HTTPInternalServerError,
'ServerBuildFailed': webob.exc.HTTPInternalServerError,
'NotSupported': webob.exc.HTTPBadRequest,
'MissingCredentialError': webob.exc.HTTPBadRequest,

View File

@ -331,3 +331,8 @@ class ActionInProgress(HeatException):
class SoftwareConfigMissing(HeatException):
msg_fmt = _("The config (%(software_config_id)s) could not be found.")
class StopActionFailed(HeatException):
msg_fmt = _("Failed to stop stack (%(stack_name)s) on other engine "
"(%(engine_id)s)")

View File

@ -19,6 +19,7 @@ import json
from oslo.config import cfg
import webob
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
cfg.CONF.import_opt('max_resources_per_stack', 'heat.common.config')
cfg.CONF.import_opt('max_stacks_per_tenant', 'heat.common.config')
@ -44,12 +45,12 @@ from heat.engine import watchrule
from heat.openstack.common import log as logging
from heat.openstack.common import threadgroup
from heat.openstack.common.gettextutils import _
from heat.openstack.common.rpc import service
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
from heat.openstack.common.rpc import service
from heat.openstack.common import excutils
from heat.openstack.common import uuidutils
logger = logging.getLogger(__name__)
@ -147,12 +148,24 @@ class ThreadGroupManager(object):
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def stop(self, stack_id):
'''Stop any active threads on a stack.'''
if stack_id in self.groups:
self.groups[stack_id].stop()
del self.groups[stack_id]
class EngineListener(service.Service):
'''
Listen on an AMQP queue while a stack action is in-progress and
respond to stack-related questions. Used for multi-engine support.
Listen on an AMQP queue named for the engine. Allows individual
engines to communicate with each other for multi-engine support.
'''
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__(host, engine_id)
self.thread_group_mgr = thread_group_mgr
self.engine_id = engine_id
def listening(self, ctxt):
'''
Respond affirmatively to confirm that the engine performing the
@ -160,6 +173,11 @@ class EngineListener(service.Service):
'''
return True
def stop_stack(self, ctxt, stack_identity):
'''Stop any active threads on a stack.'''
stack_id = stack_identity['stack_id']
self.thread_group_mgr.stop(stack_id)
class EngineService(service.Service):
"""
@ -180,7 +198,8 @@ class EngineService(service.Service):
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(host, self.engine_id)
self.listener = EngineListener(host, self.engine_id,
self.thread_group_mgr)
logger.debug(_("Starting listener for engine %s") % self.engine_id)
self.listener.start()
@ -542,10 +561,52 @@ class EngineService(service.Service):
:param cnxt: RPC context.
:param stack_identity: Name of the stack you want to delete.
"""
def remote_stop(lock_engine_id):
rpc = proxy.RpcProxy(lock_engine_id, "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=stack_identity)
timeout = cfg.CONF.engine_life_check_timeout
try:
rpc.call(cnxt, msg, topic=lock_engine_id, timeout=timeout)
except rpc_common.Timeout:
return False
st = self._get_stack(cnxt, stack_identity)
logger.info(_('Deleting stack %s') % st.name)
stack = parser.Stack.load(cnxt, stack=st)
logger.info(_('deleting stack %s') % st.name)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
acquire_result = lock.try_acquire()
if acquire_result is None:
self.thread_group_mgr.start_with_acquired_lock(stack, lock,
stack.delete)
return
elif acquire_result == self.engine_id: # Current engine has the lock
self.thread_group_mgr.stop(stack.id)
# If the lock isn't released here, then the call to
# start_with_lock below will raise an ActionInProgress
# exception. Ideally, we wouldn't be calling another
# release() here, since it should be called as soon as the
# ThreadGroup is stopped. But apparently there's a race
# between release() the next call to lock.acquire().
db_api.stack_lock_release(stack.id, self.engine_id)
else: # Another engine has the lock
other_engine_id = acquire_result
stop_result = remote_stop(other_engine_id)
if stop_result is None:
logger.debug(_("Successfully stopped remote task on engine %s")
% other_engine_id)
else:
raise exception.StopActionFailed(stack_name=stack.name,
engine_id=other_engine_id)
# There may be additional resources that we don't know about
# if an update was in-progress when the stack was stopped, so
# reload the stack from the database.
st = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=st)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,

View File

@ -48,6 +48,13 @@ class StackLock(object):
def generate_engine_id():
return str(uuid.uuid4())
def try_acquire(self):
"""
Try to acquire a stack lock, but don't raise an ActionInProgress
exception or try to steal lock.
"""
return db_api.stack_lock_create(self.stack.id, self.engine_id)
@rpc_common.client_exceptions(exception.ActionInProgress)
def acquire(self, retry=True):
"""

View File

@ -24,6 +24,8 @@ import mox
from oslo.config import cfg
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
from heat.engine import environment
from heat.common import exception
from heat.common import urlfetch
@ -42,9 +44,11 @@ from heat.engine import resource as res
from heat.engine.resources import instance as instances
from heat.engine.resources import nova_utils
from heat.engine import resource as rsrs
from heat.engine import stack_lock
from heat.engine import watchrule
from heat.openstack.common import threadgroup
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
from heat.tests.common import HeatTestCase
from heat.tests import generic_resource as generic_rsrc
from heat.tests import utils
@ -670,6 +674,101 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.ctx, stack.identifier())
self.m.VerifyAll()
def test_stack_delete_acquired_lock(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.man.tg = DummyThreadGroup()
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn(self.man.engine_id)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_delete_current_engine_active_lock(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, self.man.engine_id)
# Create a fake ThreadGroup too
self.man.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn(self.man.engine_id)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_delete_other_engine_active_lock_failed(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, "other-engine-fake-uuid")
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn("other-engine-fake-uuid")
rpc = proxy.RpcProxy("other-engine-fake-uuid", "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=mox.IgnoreArg())
self.m.StubOutWithMock(proxy.RpcProxy, 'call')
proxy.RpcProxy.call(self.ctx, msg, topic='other-engine-fake-uuid',
timeout=cfg.CONF.engine_life_check_timeout)\
.AndRaise(rpc_common.Timeout)
self.m.ReplayAll()
self.assertRaises(exception.StopActionFailed,
self.man.delete_stack, self.ctx, stack.identifier())
self.m.VerifyAll()
def test_stack_delete_other_engine_active_lock_succeeded(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, "other-engine-fake-uuid")
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn("other-engine-fake-uuid")
rpc = proxy.RpcProxy("other-engine-fake-uuid", "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=mox.IgnoreArg())
self.m.StubOutWithMock(proxy.RpcProxy, 'call')
proxy.RpcProxy.call(self.ctx, msg, topic='other-engine-fake-uuid',
timeout=cfg.CONF.engine_life_check_timeout)\
.AndReturn(None)
self.m.StubOutWithMock(stack_lock.StackLock, 'acquire')
stack_lock.StackLock.acquire().AndReturn(None)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_update(self):
stack_name = 'service_update_test_stack'
params = {'foo': 'bar'}