Merge "[Event-engine] Allow event_engine to work in HA"
This commit is contained in:
commit
0eb1611327
|
@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine):
|
|||
|
||||
return
|
||||
|
||||
security.delete_trust(trigger['trust_id'])
|
||||
|
||||
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
||||
|
|
|
@ -137,7 +137,7 @@ class RPCClient(object):
|
|||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
"""Asynchronous call of RPC method.
|
||||
|
||||
Does not block the thread, just send invoking data to
|
||||
|
|
|
@ -397,26 +397,29 @@ class EventEngineClient(evt_eng.EventEngine):
|
|||
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
|
||||
|
||||
def create_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'create_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def delete_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'delete_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def update_event_trigger(self, trigger):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'update_event_trigger',
|
||||
trigger=trigger,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
def sync_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._call(ctx, method, async_=False, target=target, **kwargs)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._call(ctx, method, async_=True, target=target, **kwargs)
|
||||
|
|
|
@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient):
|
|||
**kwargs
|
||||
)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._client.prepare(topic=self.topic, server=target).cast(
|
||||
ctx,
|
||||
method,
|
||||
**kwargs
|
||||
)
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._client.prepare(topic=self.topic,
|
||||
server=target,
|
||||
fanout=fanout).cast(ctx, method, **kwargs)
|
||||
|
|
|
@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger):
|
|||
list(events)
|
||||
)
|
||||
|
||||
security.delete_trust(event_trigger['trust_id'])
|
||||
|
||||
|
||||
def update_event_trigger(id, values):
|
||||
trig = db_api.update_event_trigger(id, values)
|
||||
|
|
|
@ -21,6 +21,7 @@ import sqlalchemy as sa
|
|||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import security
|
||||
from mistral.services import triggers
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.tests.unit import base as unit_base
|
||||
|
@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest):
|
|||
self.assertEqual(400, resp.status_int)
|
||||
|
||||
@mock.patch('mistral.rpc.clients.get_event_engine_client')
|
||||
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
|
||||
@mock.patch('mistral.db.v2.api.get_event_trigger')
|
||||
@mock.patch.object(db_api, "get_event_triggers",
|
||||
mock.MagicMock(return_value=[]))
|
||||
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
|
||||
def test_delete(self, mock_rpc_client):
|
||||
@mock.patch.object(security, "delete_trust", MOCK_NONE)
|
||||
def test_delete(self, mock_delete, mock_rpc_client):
|
||||
client = mock.Mock()
|
||||
mock_rpc_client.return_value = client
|
||||
|
||||
DELETE_TRIGGER = models.EventTrigger()
|
||||
DELETE_TRIGGER.update(trigger_values)
|
||||
DELETE_TRIGGER.update(
|
||||
{'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'}
|
||||
)
|
||||
mock_delete.return_value = DELETE_TRIGGER
|
||||
|
||||
resp = self.app.delete(
|
||||
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
|
||||
)
|
||||
|
@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest):
|
|||
self.assertEqual(1, client.delete_event_trigger.call_count)
|
||||
|
||||
self.assertDictEqual(
|
||||
TRIGGER_DB.to_dict(),
|
||||
DELETE_TRIGGER.to_dict(),
|
||||
client.delete_event_trigger.call_args[0][0]
|
||||
)
|
||||
self.assertListEqual(
|
||||
|
|
Loading…
Reference in New Issue