Merge "[Event-engine] Allow event_engine to work in HA"

This commit is contained in:
Zuul 2018-12-09 11:31:19 +00:00 committed by Gerrit Code Review
commit 0eb1611327
7 changed files with 28 additions and 18 deletions

View File

@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine):
return
security.delete_trust(trigger['trust_id'])
self._add_event_listener(trigger['exchange'], trigger['topic'], events)

View File

@ -137,7 +137,7 @@ class RPCClient(object):
raise NotImplementedError
@abc.abstractmethod
def async_call(self, ctx, method, target=None, **kwargs):
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
"""Asynchronous call of RPC method.
Does not block the thread, just send invoking data to

View File

@ -397,26 +397,29 @@ class EventEngineClient(evt_eng.EventEngine):
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
def create_event_trigger(self, trigger, events):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'create_event_trigger',
trigger=trigger,
events=events
events=events,
fanout=True,
)
def delete_event_trigger(self, trigger, events):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'delete_event_trigger',
trigger=trigger,
events=events
events=events,
fanout=True,
)
def update_event_trigger(self, trigger):
return self._client.sync_call(
return self._client.async_call(
auth_ctx.ctx(),
'update_event_trigger',
trigger=trigger,
fanout=True,
)

View File

@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
def sync_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async_=False, target=target, **kwargs)
def async_call(self, ctx, method, target=None, **kwargs):
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
return self._call(ctx, method, async_=True, target=target, **kwargs)

View File

@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient):
**kwargs
)
def async_call(self, ctx, method, target=None, **kwargs):
return self._client.prepare(topic=self.topic, server=target).cast(
ctx,
method,
**kwargs
)
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
return self._client.prepare(topic=self.topic,
server=target,
fanout=fanout).cast(ctx, method, **kwargs)

View File

@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger):
list(events)
)
security.delete_trust(event_trigger['trust_id'])
def update_event_trigger(id, values):
trig = db_api.update_event_trigger(id, values)

View File

@ -21,6 +21,7 @@ import sqlalchemy as sa
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import security
from mistral.services import triggers
from mistral.tests.unit.api import base
from mistral.tests.unit import base as unit_base
@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(400, resp.status_int)
@mock.patch('mistral.rpc.clients.get_event_engine_client')
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
@mock.patch('mistral.db.v2.api.get_event_trigger')
@mock.patch.object(db_api, "get_event_triggers",
mock.MagicMock(return_value=[]))
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
def test_delete(self, mock_rpc_client):
@mock.patch.object(security, "delete_trust", MOCK_NONE)
def test_delete(self, mock_delete, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
DELETE_TRIGGER = models.EventTrigger()
DELETE_TRIGGER.update(trigger_values)
DELETE_TRIGGER.update(
{'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'}
)
mock_delete.return_value = DELETE_TRIGGER
resp = self.app.delete(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
)
@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest):
self.assertEqual(1, client.delete_event_trigger.call_count)
self.assertDictEqual(
TRIGGER_DB.to_dict(),
DELETE_TRIGGER.to_dict(),
client.delete_event_trigger.call_args[0][0]
)
self.assertListEqual(