Fixes agent call may hanged upon action call

When action called AgentListener automatically starts listening upon
first EP send to the agent. But Environment.deploy() were the only
place where AgentLister was stopped. So when action other than
Environment.deploy() was called there is no one to stop listener.
Thus on each action call new listener on the same RabbitMQ queue
was started causing listeners to steal messages from each other.
Agent.call() that never received response from agent caused
deployment/action hang

Change-Id: I466bbf60c35e0d6a0bc6e831010e552aaa12eaab
This commit is contained in:
Stan Lagun 2015-02-26 01:02:58 +03:00
parent d728141334
commit 1d7f0b1279
6 changed files with 74 additions and 32 deletions

View File

@ -59,15 +59,11 @@ Methods:
deploy:
Usage: Action
Body:
Try:
- $.agentListener.start()
- If: len($.applications) = 0
Then:
- $.stack.delete()
Else:
- $.applications.pselect($.deploy())
Finally:
- $.agentListener.stop()
- If: len($.applications) = 0
Then:
- $.stack.delete()
Else:
- $.applications.pselect($.deploy())
destroy:
Body:

View File

@ -114,6 +114,7 @@ class TaskExecutor(object):
obj = exc.load(self.model)
try:
self.environment.start()
# Skip execution of action in case of no action is provided.
# Model will be just loaded, cleaned-up and unloaded.
# Most of the time this is used for deletion of environments.
@ -127,6 +128,8 @@ class TaskExecutor(object):
reporter = status_reporter.StatusReporter()
reporter.initialize(obj)
reporter.report_error(obj, str(e))
finally:
self.environment.finish()
return results_serializer.serialize(obj, exc)

View File

@ -13,8 +13,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import murano.openstack.common.log as logging
LOG = logging.getLogger(__name__)
class Environment(object):
def __init__(self):
self.token = None
self.tenant_id = None
self._set_up_list = []
self._tear_down_list = []
def on_session_start(self, delegate):
self._set_up_list.append(delegate)
def on_session_finish(self, delegate):
self._tear_down_list.append(delegate)
def start(self):
for delegate in self._set_up_list:
try:
delegate()
except Exception as e:
LOG.exception(e)
self._set_up_list = []
def finish(self):
for delegate in self._tear_down_list:
try:
delegate()
except Exception as e:
LOG.exception(e)
self._tear_down_list = []

View File

@ -77,14 +77,13 @@ class Agent(murano_object.MuranoObject):
"Use of murano-agent is disallowed "
"by the server configuration")
def _send(self, template, wait_results):
def _send(self, template, wait_results, _context):
"""Send a message over the MQ interface."""
msg_id = template.get('ID', uuid.uuid4().hex)
if wait_results:
event = eventlet.event.Event()
listener = self._environment.agentListener
listener.subscribe(msg_id, event)
listener.start()
listener.subscribe(msg_id, event, _context)
msg = messaging.Message()
msg.body = template
@ -106,23 +105,23 @@ class Agent(murano_object.MuranoObject):
else:
return None
def call(self, template, resources):
def call(self, template, resources, _context):
self._check_enabled()
plan = self.buildExecutionPlan(template, resources)
return self._send(plan, True)
return self._send(plan, True, _context)
def send(self, template, resources):
def send(self, template, resources, _context):
self._check_enabled()
plan = self.buildExecutionPlan(template, resources)
return self._send(plan, False)
return self._send(plan, False, _context)
def callRaw(self, plan):
def callRaw(self, plan, _context):
self._check_enabled()
return self._send(plan, True)
return self._send(plan, True, _context)
def sendRaw(self, plan):
def sendRaw(self, plan, _context):
self._check_enabled()
return self._send(plan, False)
return self._send(plan, False, _context)
def _process_v1_result(self, result):
if result['IsException']:

View File

@ -14,8 +14,10 @@
# limitations under the License.
import eventlet
import greenlet
import murano.common.config as config
from murano.dsl import helpers
import murano.dsl.murano_class as murano_class
import murano.dsl.murano_object as murano_object
import murano.engine.system.common as common
@ -47,13 +49,15 @@ class AgentListener(murano_object.MuranoObject):
def queueName(self):
return self._results_queue
def start(self):
def start(self, _context):
if config.CONF.engine.disable_murano_agent:
# Noop
LOG.debug("murano-agent is disabled by the server")
return
if self._receive_thread is None:
helpers.get_environment(_context).on_session_finish(
lambda: self.stop())
self._receive_thread = eventlet.spawn(self._receive)
def stop(self):
@ -63,16 +67,22 @@ class AgentListener(murano_object.MuranoObject):
return
if self._receive_thread is not None:
self._receive_thread.kill()
self._receive_thread = None
try:
self._receive_thread.kill()
self._receive_thread.wait()
except greenlet.GreenletExit:
pass
finally:
self._receive_thread = None
def subscribe(self, message_id, event):
def subscribe(self, message_id, event, _context):
if config.CONF.engine.disable_murano_agent:
raise AgentListenerException(
"Use of murano-agent is disallowed "
"by the server configuration")
self._subscriptions[message_id] = event
self.start(_context)
def _receive(self):
with common.create_rmq_client() as client:

View File

@ -15,6 +15,9 @@
import mock
import yaql.context
from murano.engine import environment
from murano.engine.system import agent
from murano.engine.system import agent_listener
from murano.tests.unit.dsl.foundation import object_model as om
@ -30,12 +33,14 @@ class TestAgentListener(test_case.DslTestCase):
model = om.Object(
'AgentListenerTests')
self.runner = self.new_runner(model)
self.context = yaql.context.Context()
self.context.set_data(environment.Environment(), '?environment')
def test_listener_enabled(self):
self.override_config('disable_murano_agent', False, 'engine')
al = self.runner.testAgentListener()
self.assertTrue(al.enabled)
al.subscribe('msgid', 'event')
al.subscribe('msgid', 'event', self.context)
self.assertEqual({'msgid': 'event'}, al._subscriptions)
def test_listener_disabled(self):
@ -43,7 +48,7 @@ class TestAgentListener(test_case.DslTestCase):
al = self.runner.testAgentListener()
self.assertFalse(al.enabled)
self.assertRaises(agent_listener.AgentListenerException,
al.subscribe, 'msgid', 'event')
al.subscribe, 'msgid', 'event', None)
class TestAgent(test_case.DslTestCase):
@ -69,14 +74,14 @@ class TestAgent(test_case.DslTestCase):
with mock.patch(agent_cls + '._send') as s:
s.return_value = mock.MagicMock()
a.sendRaw({})
s.assert_called_with({}, False)
a.sendRaw({}, None)
s.assert_called_with({}, False, None)
def test_agent_disabled(self):
self.override_config('disable_murano_agent', True, 'engine')
a = self.runner.testAgent()
self.assertFalse(a.enabled)
self.assertRaises(agent.AgentException, a.call, {}, None)
self.assertRaises(agent.AgentException, a.send, {}, None)
self.assertRaises(agent.AgentException, a.callRaw, {})
self.assertRaises(agent.AgentException, a.sendRaw, {})
self.assertRaises(agent.AgentException, a.call, {}, None, None)
self.assertRaises(agent.AgentException, a.send, {}, None, None)
self.assertRaises(agent.AgentException, a.callRaw, {}, None)
self.assertRaises(agent.AgentException, a.sendRaw, {}, None)