diff --git a/meta/io.murano/Classes/Environment.yaml b/meta/io.murano/Classes/Environment.yaml index c8711eb56..db6dc472c 100644 --- a/meta/io.murano/Classes/Environment.yaml +++ b/meta/io.murano/Classes/Environment.yaml @@ -59,15 +59,11 @@ Methods: deploy: Usage: Action Body: - Try: - - $.agentListener.start() - - If: len($.applications) = 0 - Then: - - $.stack.delete() - Else: - - $.applications.pselect($.deploy()) - Finally: - - $.agentListener.stop() + - If: len($.applications) = 0 + Then: + - $.stack.delete() + Else: + - $.applications.pselect($.deploy()) destroy: Body: diff --git a/murano/common/engine.py b/murano/common/engine.py index b6bd7b584..84fdef610 100644 --- a/murano/common/engine.py +++ b/murano/common/engine.py @@ -114,6 +114,7 @@ class TaskExecutor(object): obj = exc.load(self.model) try: + self.environment.start() # Skip execution of action in case of no action is provided. # Model will be just loaded, cleaned-up and unloaded. # Most of the time this is used for deletion of environments. @@ -127,6 +128,8 @@ class TaskExecutor(object): reporter = status_reporter.StatusReporter() reporter.initialize(obj) reporter.report_error(obj, str(e)) + finally: + self.environment.finish() return results_serializer.serialize(obj, exc) diff --git a/murano/engine/environment.py b/murano/engine/environment.py index e696a4677..53638788c 100644 --- a/murano/engine/environment.py +++ b/murano/engine/environment.py @@ -13,8 +13,37 @@ # See the License for the specific language governing permissions and # limitations under the License. +import murano.openstack.common.log as logging + + +LOG = logging.getLogger(__name__) + class Environment(object): def __init__(self): self.token = None self.tenant_id = None + self._set_up_list = [] + self._tear_down_list = [] + + def on_session_start(self, delegate): + self._set_up_list.append(delegate) + + def on_session_finish(self, delegate): + self._tear_down_list.append(delegate) + + def start(self): + for delegate in self._set_up_list: + try: + delegate() + except Exception as e: + LOG.exception(e) + self._set_up_list = [] + + def finish(self): + for delegate in self._tear_down_list: + try: + delegate() + except Exception as e: + LOG.exception(e) + self._tear_down_list = [] diff --git a/murano/engine/system/agent.py b/murano/engine/system/agent.py index 3d5fd2669..796a454bb 100644 --- a/murano/engine/system/agent.py +++ b/murano/engine/system/agent.py @@ -77,14 +77,13 @@ class Agent(murano_object.MuranoObject): "Use of murano-agent is disallowed " "by the server configuration") - def _send(self, template, wait_results): + def _send(self, template, wait_results, _context): """Send a message over the MQ interface.""" msg_id = template.get('ID', uuid.uuid4().hex) if wait_results: event = eventlet.event.Event() listener = self._environment.agentListener - listener.subscribe(msg_id, event) - listener.start() + listener.subscribe(msg_id, event, _context) msg = messaging.Message() msg.body = template @@ -106,23 +105,23 @@ class Agent(murano_object.MuranoObject): else: return None - def call(self, template, resources): + def call(self, template, resources, _context): self._check_enabled() plan = self.buildExecutionPlan(template, resources) - return self._send(plan, True) + return self._send(plan, True, _context) - def send(self, template, resources): + def send(self, template, resources, _context): self._check_enabled() plan = self.buildExecutionPlan(template, resources) - return self._send(plan, False) + return self._send(plan, False, _context) - def callRaw(self, plan): + def callRaw(self, plan, _context): self._check_enabled() - return self._send(plan, True) + return self._send(plan, True, _context) - def sendRaw(self, plan): + def sendRaw(self, plan, _context): self._check_enabled() - return self._send(plan, False) + return self._send(plan, False, _context) def _process_v1_result(self, result): if result['IsException']: diff --git a/murano/engine/system/agent_listener.py b/murano/engine/system/agent_listener.py index ebc7b5ddd..630839d39 100644 --- a/murano/engine/system/agent_listener.py +++ b/murano/engine/system/agent_listener.py @@ -14,8 +14,10 @@ # limitations under the License. import eventlet +import greenlet import murano.common.config as config +from murano.dsl import helpers import murano.dsl.murano_class as murano_class import murano.dsl.murano_object as murano_object import murano.engine.system.common as common @@ -47,13 +49,15 @@ class AgentListener(murano_object.MuranoObject): def queueName(self): return self._results_queue - def start(self): + def start(self, _context): if config.CONF.engine.disable_murano_agent: # Noop LOG.debug("murano-agent is disabled by the server") return if self._receive_thread is None: + helpers.get_environment(_context).on_session_finish( + lambda: self.stop()) self._receive_thread = eventlet.spawn(self._receive) def stop(self): @@ -63,16 +67,22 @@ class AgentListener(murano_object.MuranoObject): return if self._receive_thread is not None: - self._receive_thread.kill() - self._receive_thread = None + try: + self._receive_thread.kill() + self._receive_thread.wait() + except greenlet.GreenletExit: + pass + finally: + self._receive_thread = None - def subscribe(self, message_id, event): + def subscribe(self, message_id, event, _context): if config.CONF.engine.disable_murano_agent: raise AgentListenerException( "Use of murano-agent is disallowed " "by the server configuration") self._subscriptions[message_id] = event + self.start(_context) def _receive(self): with common.create_rmq_client() as client: diff --git a/murano/tests/unit/dsl/test_agent.py b/murano/tests/unit/dsl/test_agent.py index 82a346baa..5a7b46440 100644 --- a/murano/tests/unit/dsl/test_agent.py +++ b/murano/tests/unit/dsl/test_agent.py @@ -15,6 +15,9 @@ import mock +import yaql.context + +from murano.engine import environment from murano.engine.system import agent from murano.engine.system import agent_listener from murano.tests.unit.dsl.foundation import object_model as om @@ -30,12 +33,14 @@ class TestAgentListener(test_case.DslTestCase): model = om.Object( 'AgentListenerTests') self.runner = self.new_runner(model) + self.context = yaql.context.Context() + self.context.set_data(environment.Environment(), '?environment') def test_listener_enabled(self): self.override_config('disable_murano_agent', False, 'engine') al = self.runner.testAgentListener() self.assertTrue(al.enabled) - al.subscribe('msgid', 'event') + al.subscribe('msgid', 'event', self.context) self.assertEqual({'msgid': 'event'}, al._subscriptions) def test_listener_disabled(self): @@ -43,7 +48,7 @@ class TestAgentListener(test_case.DslTestCase): al = self.runner.testAgentListener() self.assertFalse(al.enabled) self.assertRaises(agent_listener.AgentListenerException, - al.subscribe, 'msgid', 'event') + al.subscribe, 'msgid', 'event', None) class TestAgent(test_case.DslTestCase): @@ -69,14 +74,14 @@ class TestAgent(test_case.DslTestCase): with mock.patch(agent_cls + '._send') as s: s.return_value = mock.MagicMock() - a.sendRaw({}) - s.assert_called_with({}, False) + a.sendRaw({}, None) + s.assert_called_with({}, False, None) def test_agent_disabled(self): self.override_config('disable_murano_agent', True, 'engine') a = self.runner.testAgent() self.assertFalse(a.enabled) - self.assertRaises(agent.AgentException, a.call, {}, None) - self.assertRaises(agent.AgentException, a.send, {}, None) - self.assertRaises(agent.AgentException, a.callRaw, {}) - self.assertRaises(agent.AgentException, a.sendRaw, {}) + self.assertRaises(agent.AgentException, a.call, {}, None, None) + self.assertRaises(agent.AgentException, a.send, {}, None, None) + self.assertRaises(agent.AgentException, a.callRaw, {}, None) + self.assertRaises(agent.AgentException, a.sendRaw, {}, None)