[Event-engine] Allow event_engine to work in HA
A previous patch allows to make multiple event_engines to listen
to a single queue, but the RPC calls on CRUD are still synchronous
This patch modifies the calls and broadcasts them on all the event
engines allow them to modify each independent listeners.
Closes-Bug: #1715848
Change-Id: Ia37831a03993f5a1bf980d62344d25377062788d
(cherry picked from commit defff08773
)
This commit is contained in:
parent
ce3d8607af
commit
ec06ccf65a
|
@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine):
|
|||
|
||||
return
|
||||
|
||||
security.delete_trust(trigger['trust_id'])
|
||||
|
||||
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
||||
|
|
|
@ -137,7 +137,7 @@ class RPCClient(object):
|
|||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
"""Asynchronous call of RPC method.
|
||||
|
||||
Does not block the thread, just send invoking data to
|
||||
|
|
|
@ -397,26 +397,29 @@ class EventEngineClient(evt_eng.EventEngine):
|
|||
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
|
||||
|
||||
def create_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'create_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def delete_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'delete_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def update_event_trigger(self, trigger):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'update_event_trigger',
|
||||
trigger=trigger,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
def sync_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._call(ctx, method, async_=False, target=target, **kwargs)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._call(ctx, method, async_=True, target=target, **kwargs)
|
||||
|
|
|
@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient):
|
|||
**kwargs
|
||||
)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._client.prepare(topic=self.topic, server=target).cast(
|
||||
ctx,
|
||||
method,
|
||||
**kwargs
|
||||
)
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._client.prepare(topic=self.topic,
|
||||
server=target,
|
||||
fanout=fanout).cast(ctx, method, **kwargs)
|
||||
|
|
|
@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger):
|
|||
list(events)
|
||||
)
|
||||
|
||||
security.delete_trust(event_trigger['trust_id'])
|
||||
|
||||
|
||||
def update_event_trigger(id, values):
|
||||
trig = db_api.update_event_trigger(id, values)
|
||||
|
|
|
@ -21,6 +21,7 @@ import sqlalchemy as sa
|
|||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import security
|
||||
from mistral.services import triggers
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.tests.unit import base as unit_base
|
||||
|
@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest):
|
|||
self.assertEqual(400, resp.status_int)
|
||||
|
||||
@mock.patch('mistral.rpc.clients.get_event_engine_client')
|
||||
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
|
||||
@mock.patch('mistral.db.v2.api.get_event_trigger')
|
||||
@mock.patch.object(db_api, "get_event_triggers",
|
||||
mock.MagicMock(return_value=[]))
|
||||
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
|
||||
def test_delete(self, mock_rpc_client):
|
||||
@mock.patch.object(security, "delete_trust", MOCK_NONE)
|
||||
def test_delete(self, mock_delete, mock_rpc_client):
|
||||
client = mock.Mock()
|
||||
mock_rpc_client.return_value = client
|
||||
|
||||
DELETE_TRIGGER = models.EventTrigger()
|
||||
DELETE_TRIGGER.update(trigger_values)
|
||||
DELETE_TRIGGER.update(
|
||||
{'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'}
|
||||
)
|
||||
mock_delete.return_value = DELETE_TRIGGER
|
||||
|
||||
resp = self.app.delete(
|
||||
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
|
||||
)
|
||||
|
@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest):
|
|||
self.assertEqual(1, client.delete_event_trigger.call_count)
|
||||
|
||||
self.assertDictEqual(
|
||||
TRIGGER_DB.to_dict(),
|
||||
DELETE_TRIGGER.to_dict(),
|
||||
client.delete_event_trigger.call_args[0][0]
|
||||
)
|
||||
self.assertListEqual(
|
||||
|
|
Loading…
Reference in New Issue