[Event-engine] Allow event_engine to work in HA

A previous patch allows to make multiple event_engines to listen
to a single queue, but the RPC calls on CRUD are still synchronous

This patch modifies the calls and broadcasts them on all the event
engines allow them to modify each independent listeners.

Closes-Bug: #1715848
Change-Id: Ia37831a03993f5a1bf980d62344d25377062788d
This commit is contained in:
Jose Castro Leon 2018-02-26 16:00:27 +01:00 committed by Vlad Gusev
parent 70c269e7d1
commit defff08773
7 changed files with 28 additions and 18 deletions

View File

@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine):
return
security.delete_trust(trigger['trust_id'])
self._add_event_listener(trigger['exchange'], trigger['topic'], events)

View File

@ -137,7 +137,7 @@ class RPCClient(object):
raise NotImplementedError
@abc.abstractmethod
def async_call(self, ctx, method, target=None, **kwargs):
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
"""Asynchronous call of RPC method.
Does not block the thread, just send invoking data to

View File

@ -392,26 +392,29 @@ class EventEngineClient(evt_eng.EventEngine):
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
def create_event_trigger(self, trigger, events):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'create_event_trigger',
trigger=trigger,
events=events
events=events,
fanout=True,
)
def delete_event_trigger(self, trigger, events):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'delete_event_trigger',
trigger=trigger,
events=events
events=events,
fanout=True,
)
def update_event_trigger(self, trigger):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'update_event_trigger',
trigger=trigger,
fanout=True,
)

View File

@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
def sync_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async_=False, target=target, **kwargs)
def async_call(self, ctx, method, target=None, **kwargs):
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
return self._call(ctx, method, async_=True, target=target, **kwargs)

View File

@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient):
**kwargs
)
def async_call(self, ctx, method, target=None, **kwargs):
return self._client.prepare(topic=self.topic, server=target).cast(
ctx,
method,
**kwargs
)
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
return self._client.prepare(topic=self.topic,
server=target,
fanout=fanout).cast(ctx, method, **kwargs)

View File

@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger):
list(events)
)
security.delete_trust(event_trigger['trust_id'])
def update_event_trigger(id, values):
trig = db_api.update_event_trigger(id, values)

View File

@ -21,6 +21,7 @@ import sqlalchemy as sa
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import security
from mistral.services import triggers
from mistral.tests.unit.api import base
from mistral.tests.unit import base as unit_base
@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(400, resp.status_int)
@mock.patch('mistral.rpc.clients.get_event_engine_client')
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
@mock.patch('mistral.db.v2.api.get_event_trigger')
@mock.patch.object(db_api, "get_event_triggers",
mock.MagicMock(return_value=[]))
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
def test_delete(self, mock_rpc_client):
@mock.patch.object(security, "delete_trust", MOCK_NONE)
def test_delete(self, mock_delete, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
DELETE_TRIGGER = models.EventTrigger()
DELETE_TRIGGER.update(trigger_values)
DELETE_TRIGGER.update(
{'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'}
)
mock_delete.return_value = DELETE_TRIGGER
resp = self.app.delete(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
)
@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(1, client.delete_event_trigger.call_count)
self.assertDictEqual(
TRIGGER_DB.to_dict(),
DELETE_TRIGGER.to_dict(),
client.delete_event_trigger.call_args[0][0]
)
self.assertListEqual(