From ec06ccf65a69fdfe95f2f669591ff2a4a3aca7cc Mon Sep 17 00:00:00 2001 From: Jose Castro Leon Date: Mon, 26 Feb 2018 16:00:27 +0100 Subject: [PATCH] [Event-engine] Allow event_engine to work in HA A previous patch allows to make multiple event_engines to listen to a single queue, but the RPC calls on CRUD are still synchronous This patch modifies the calls and broadcasts them on all the event engines allow them to modify each independent listeners. Closes-Bug: #1715848 Change-Id: Ia37831a03993f5a1bf980d62344d25377062788d (cherry picked from commit defff0877392400e572dca772d24b2c6fe49dca1) --- mistral/event_engine/default_event_engine.py | 2 -- mistral/rpc/base.py | 2 +- mistral/rpc/clients.py | 13 ++++++++----- mistral/rpc/kombu/kombu_client.py | 2 +- mistral/rpc/oslo/oslo_client.py | 10 ++++------ mistral/services/triggers.py | 2 ++ mistral/tests/unit/api/v2/test_event_trigger.py | 15 ++++++++++++--- 7 files changed, 28 insertions(+), 18 deletions(-) diff --git a/mistral/event_engine/default_event_engine.py b/mistral/event_engine/default_event_engine.py index 12a7b8e63..e0f68e6f8 100644 --- a/mistral/event_engine/default_event_engine.py +++ b/mistral/event_engine/default_event_engine.py @@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine): return - security.delete_trust(trigger['trust_id']) - self._add_event_listener(trigger['exchange'], trigger['topic'], events) diff --git a/mistral/rpc/base.py b/mistral/rpc/base.py index 1139cb550..9c2ba7fbe 100644 --- a/mistral/rpc/base.py +++ b/mistral/rpc/base.py @@ -137,7 +137,7 @@ class RPCClient(object): raise NotImplementedError @abc.abstractmethod - def async_call(self, ctx, method, target=None, **kwargs): + def async_call(self, ctx, method, target=None, fanout=False, **kwargs): """Asynchronous call of RPC method. Does not block the thread, just send invoking data to diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index ce2219b56..67b5c88c8 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -397,26 +397,29 @@ class EventEngineClient(evt_eng.EventEngine): self._client = base.get_rpc_client_driver()(rpc_conf_dict) def create_event_trigger(self, trigger, events): - return self._client.sync_call( + return self._client.async_call( auth_ctx.ctx(), 'create_event_trigger', trigger=trigger, - events=events + events=events, + fanout=True, ) def delete_event_trigger(self, trigger, events): - return self._client.sync_call( + return self._client.async_call( auth_ctx.ctx(), 'delete_event_trigger', trigger=trigger, - events=events + events=events, + fanout=True, ) def update_event_trigger(self, trigger): - return self._client.sync_call( + return self._client.async_call( auth_ctx.ctx(), 'update_event_trigger', trigger=trigger, + fanout=True, ) diff --git a/mistral/rpc/kombu/kombu_client.py b/mistral/rpc/kombu/kombu_client.py index 3734d7ef4..87fbbd2e7 100644 --- a/mistral/rpc/kombu/kombu_client.py +++ b/mistral/rpc/kombu/kombu_client.py @@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): def sync_call(self, ctx, method, target=None, **kwargs): return self._call(ctx, method, async_=False, target=target, **kwargs) - def async_call(self, ctx, method, target=None, **kwargs): + def async_call(self, ctx, method, target=None, fanout=False, **kwargs): return self._call(ctx, method, async_=True, target=target, **kwargs) diff --git a/mistral/rpc/oslo/oslo_client.py b/mistral/rpc/oslo/oslo_client.py index c0795db19..e9878a50b 100644 --- a/mistral/rpc/oslo/oslo_client.py +++ b/mistral/rpc/oslo/oslo_client.py @@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient): **kwargs ) - def async_call(self, ctx, method, target=None, **kwargs): - return self._client.prepare(topic=self.topic, server=target).cast( - ctx, - method, - **kwargs - ) + def async_call(self, ctx, method, target=None, fanout=False, **kwargs): + return self._client.prepare(topic=self.topic, + server=target, + fanout=fanout).cast(ctx, method, **kwargs) diff --git a/mistral/services/triggers.py b/mistral/services/triggers.py index d1f4fac86..732e9b48f 100644 --- a/mistral/services/triggers.py +++ b/mistral/services/triggers.py @@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger): list(events) ) + security.delete_trust(event_trigger['trust_id']) + def update_event_trigger(id, values): trig = db_api.update_event_trigger(id, values) diff --git a/mistral/tests/unit/api/v2/test_event_trigger.py b/mistral/tests/unit/api/v2/test_event_trigger.py index 06aae2e57..dbdf57956 100644 --- a/mistral/tests/unit/api/v2/test_event_trigger.py +++ b/mistral/tests/unit/api/v2/test_event_trigger.py @@ -21,6 +21,7 @@ import sqlalchemy as sa from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral import exceptions as exc +from mistral.services import security from mistral.services import triggers from mistral.tests.unit.api import base from mistral.tests.unit import base as unit_base @@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest): self.assertEqual(400, resp.status_int) @mock.patch('mistral.rpc.clients.get_event_engine_client') - @mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER) + @mock.patch('mistral.db.v2.api.get_event_trigger') @mock.patch.object(db_api, "get_event_triggers", mock.MagicMock(return_value=[])) @mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE) - def test_delete(self, mock_rpc_client): + @mock.patch.object(security, "delete_trust", MOCK_NONE) + def test_delete(self, mock_delete, mock_rpc_client): client = mock.Mock() mock_rpc_client.return_value = client + DELETE_TRIGGER = models.EventTrigger() + DELETE_TRIGGER.update(trigger_values) + DELETE_TRIGGER.update( + {'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'} + ) + mock_delete.return_value = DELETE_TRIGGER + resp = self.app.delete( '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae' ) @@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest): self.assertEqual(1, client.delete_event_trigger.call_count) self.assertDictEqual( - TRIGGER_DB.to_dict(), + DELETE_TRIGGER.to_dict(), client.delete_event_trigger.call_args[0][0] ) self.assertListEqual(