Define self.client in MessagingCore
Currently self.client is referenced within MessagingCore, but no definition is made in its constructor. Additionally self.client is defined in children classes of MessagingCore. This patchset defines self.client in the constructor of MessagingCore and removes the redefinition in its children. -self.client lazily loaded Co-Authored-By: v-francoise <Vincent.FRANCOISE@b-com.com> Change-Id: I14525a175bf1ebde3d2636024ad2f2219c79d6e1 Closes-Bug: #1521636
This commit is contained in:
parent
7406a1e713
commit
2f0c1c12cf
|
@ -43,8 +43,8 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
|||
ev.data = {}
|
||||
payload = {'action_plan__uuid': uuid,
|
||||
'action_plan_state': state}
|
||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
||||
payload)
|
||||
self.applier_manager.status_topic_handler.publish_event(
|
||||
ev.type.name, payload)
|
||||
|
||||
def execute(self):
|
||||
try:
|
||||
|
|
|
@ -34,12 +34,12 @@ APPLIER_MANAGER_OPTS = [
|
|||
min=1,
|
||||
required=True,
|
||||
help='Number of workers for applier, default value is 1.'),
|
||||
cfg.StrOpt('topic_control',
|
||||
cfg.StrOpt('conductor_topic',
|
||||
default='watcher.applier.control',
|
||||
help='The topic name used for'
|
||||
'control events, this topic '
|
||||
'used for rpc call '),
|
||||
cfg.StrOpt('topic_status',
|
||||
cfg.StrOpt('status_topic',
|
||||
default='watcher.applier.status',
|
||||
help='The topic name used for '
|
||||
'status events, this topic '
|
||||
|
@ -67,12 +67,13 @@ class ApplierManager(messaging_core.MessagingCore):
|
|||
def __init__(self):
|
||||
super(ApplierManager, self).__init__(
|
||||
CONF.watcher_applier.publisher_id,
|
||||
CONF.watcher_applier.topic_control,
|
||||
CONF.watcher_applier.topic_status,
|
||||
CONF.watcher_applier.conductor_topic,
|
||||
CONF.watcher_applier.status_topic,
|
||||
api_version=self.API_VERSION,
|
||||
)
|
||||
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
|
||||
self.conductor_topic_handler.add_endpoint(
|
||||
trigger.TriggerActionPlan(self))
|
||||
|
||||
def join(self):
|
||||
self.topic_control.join()
|
||||
self.topic_status.join()
|
||||
self.conductor_topic_handler.join()
|
||||
self.status_topic_handler.join()
|
||||
|
|
|
@ -39,17 +39,17 @@ class ApplierAPI(messaging_core.MessagingCore):
|
|||
def __init__(self):
|
||||
super(ApplierAPI, self).__init__(
|
||||
CONF.watcher_applier.publisher_id,
|
||||
CONF.watcher_applier.topic_control,
|
||||
CONF.watcher_applier.topic_status,
|
||||
CONF.watcher_applier.conductor_topic,
|
||||
CONF.watcher_applier.status_topic,
|
||||
api_version=self.API_VERSION,
|
||||
)
|
||||
self.handler = notification.NotificationHandler(self.publisher_id)
|
||||
self.handler.register_observer(self)
|
||||
self.topic_status.add_endpoint(self.handler)
|
||||
self.status_topic_handler.add_endpoint(self.handler)
|
||||
transport = om.get_transport(CONF)
|
||||
|
||||
target = om.Target(
|
||||
topic=CONF.watcher_applier.topic_control,
|
||||
topic=CONF.watcher_applier.conductor_topic,
|
||||
version=self.API_VERSION,
|
||||
)
|
||||
|
||||
|
|
|
@ -62,8 +62,8 @@ class BaseWorkFlowEngine(object):
|
|||
ev.data = {}
|
||||
payload = {'action_uuid': action.uuid,
|
||||
'action_state': state}
|
||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
||||
payload)
|
||||
self.applier_manager.status_topic_handler.publish_event(
|
||||
ev.type.name, payload)
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, actions):
|
||||
|
|
|
@ -16,58 +16,100 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from watcher.common.messaging.events.event_dispatcher import \
|
||||
EventDispatcher
|
||||
from watcher.common.messaging.messaging_handler import \
|
||||
MessagingHandler
|
||||
from watcher.common.rpc import RequestContextSerializer
|
||||
import oslo_messaging as om
|
||||
|
||||
from watcher.objects.base import WatcherObjectSerializer
|
||||
from watcher.common.messaging.events import event_dispatcher as dispatcher
|
||||
from watcher.common.messaging import messaging_handler
|
||||
from watcher.common import rpc
|
||||
|
||||
from watcher.objects import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class MessagingCore(EventDispatcher):
|
||||
class MessagingCore(dispatcher.EventDispatcher):
|
||||
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, publisher_id, topic_control, topic_status,
|
||||
def __init__(self, publisher_id, conductor_topic, status_topic,
|
||||
api_version=API_VERSION):
|
||||
super(MessagingCore, self).__init__()
|
||||
self.serializer = RequestContextSerializer(WatcherObjectSerializer())
|
||||
self.serializer = rpc.RequestContextSerializer(
|
||||
base.WatcherObjectSerializer())
|
||||
self.publisher_id = publisher_id
|
||||
self.api_version = api_version
|
||||
self.topic_control = self.build_topic(topic_control)
|
||||
self.topic_status = self.build_topic(topic_status)
|
||||
|
||||
def build_topic(self, topic_name):
|
||||
return MessagingHandler(self.publisher_id, topic_name, self,
|
||||
self.api_version, self.serializer)
|
||||
self.conductor_topic = conductor_topic
|
||||
self.status_topic = status_topic
|
||||
self.conductor_topic_handler = self.build_topic_handler(
|
||||
conductor_topic)
|
||||
self.status_topic_handler = self.build_topic_handler(status_topic)
|
||||
|
||||
self._conductor_client = None
|
||||
self._status_client = None
|
||||
|
||||
@property
|
||||
def conductor_client(self):
|
||||
if self._conductor_client is None:
|
||||
transport = om.get_transport(CONF)
|
||||
target = om.Target(
|
||||
topic=self.conductor_topic,
|
||||
version=self.API_VERSION,
|
||||
)
|
||||
self._conductor_client = om.RPCClient(
|
||||
transport, target, serializer=self.serializer)
|
||||
return self._conductor_client
|
||||
|
||||
@conductor_client.setter
|
||||
def conductor_client(self, c):
|
||||
self.conductor_client = c
|
||||
|
||||
@property
|
||||
def status_client(self):
|
||||
if self._status_client is None:
|
||||
transport = om.get_transport(CONF)
|
||||
target = om.Target(
|
||||
topic=self.status_topic,
|
||||
version=self.API_VERSION,
|
||||
)
|
||||
self._status_client = om.RPCClient(
|
||||
transport, target, serializer=self.serializer)
|
||||
return self._status_client
|
||||
|
||||
@status_client.setter
|
||||
def status_client(self, c):
|
||||
self.status_client = c
|
||||
|
||||
def build_topic_handler(self, topic_name):
|
||||
return messaging_handler.MessagingHandler(
|
||||
self.publisher_id, topic_name, self,
|
||||
self.api_version, self.serializer)
|
||||
|
||||
def connect(self):
|
||||
LOG.debug("Connecting to '%s' (%s)",
|
||||
CONF.transport_url, CONF.rpc_backend)
|
||||
self.topic_control.start()
|
||||
self.topic_status.start()
|
||||
self.conductor_topic_handler.start()
|
||||
self.status_topic_handler.start()
|
||||
|
||||
def disconnect(self):
|
||||
LOG.debug("Disconnecting from '%s' (%s)",
|
||||
CONF.transport_url, CONF.rpc_backend)
|
||||
self.topic_control.stop()
|
||||
self.topic_status.stop()
|
||||
self.conductor_topic_handler.stop()
|
||||
self.status_topic_handler.stop()
|
||||
|
||||
def publish_control(self, event, payload):
|
||||
return self.topic_control.publish_event(event, payload)
|
||||
return self.conductor_topic_handler.publish_event(event, payload)
|
||||
|
||||
def publish_status(self, event, payload, request_id=None):
|
||||
return self.topic_status.publish_event(event, payload, request_id)
|
||||
return self.status_topic_handler.publish_event(
|
||||
event, payload, request_id)
|
||||
|
||||
def get_version(self):
|
||||
return self.api_version
|
||||
|
||||
def check_api_version(self, context):
|
||||
api_manager_version = self.client.call(
|
||||
api_manager_version = self.conductor_client.call(
|
||||
context.to_dict(), 'check_api_version',
|
||||
api_version=self.api_version)
|
||||
return api_manager_version
|
||||
|
|
|
@ -38,11 +38,11 @@ CONF = cfg.CONF
|
|||
|
||||
class MessagingHandler(threading.Thread):
|
||||
|
||||
def __init__(self, publisher_id, topic_watcher, endpoint, version,
|
||||
def __init__(self, publisher_id, topic_name, endpoint, version,
|
||||
serializer=None):
|
||||
super(MessagingHandler, self).__init__()
|
||||
self.publisher_id = publisher_id
|
||||
self.topic_watcher = topic_watcher
|
||||
self.topic_name = topic_name
|
||||
self.__endpoints = []
|
||||
self.__serializer = serializer
|
||||
self.__version = version
|
||||
|
@ -72,7 +72,7 @@ class MessagingHandler(threading.Thread):
|
|||
return om.Notifier(
|
||||
self.__transport,
|
||||
publisher_id=self.publisher_id,
|
||||
topic=self.topic_watcher,
|
||||
topic=self.topic_name,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
|
@ -87,7 +87,7 @@ class MessagingHandler(threading.Thread):
|
|||
self.__notifier = self.build_notifier()
|
||||
if len(self.__endpoints):
|
||||
target = om.Target(
|
||||
topic=self.topic_watcher,
|
||||
topic=self.topic_name,
|
||||
# For compatibility, we can override it with 'host' opt
|
||||
server=CONF.host or socket.getfqdn(),
|
||||
version=self.__version,
|
||||
|
@ -101,7 +101,7 @@ class MessagingHandler(threading.Thread):
|
|||
LOG.error(_LE("Messaging configuration error"))
|
||||
|
||||
def run(self):
|
||||
LOG.debug("configure MessagingHandler for %s" % self.topic_watcher)
|
||||
LOG.debug("configure MessagingHandler for %s" % self.topic_name)
|
||||
self._configure()
|
||||
if len(self.__endpoints) > 0:
|
||||
LOG.debug("Starting up server")
|
||||
|
|
|
@ -18,17 +18,16 @@ import eventlet
|
|||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from watcher.common.messaging.utils.observable import \
|
||||
Observable
|
||||
from watcher.common.messaging.utils import observable
|
||||
|
||||
|
||||
eventlet.monkey_patch()
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationHandler(Observable):
|
||||
class NotificationHandler(observable.Observable):
|
||||
def __init__(self, publisher_id):
|
||||
Observable.__init__(self)
|
||||
super(NotificationHandler, self).__init__()
|
||||
self.publisher_id = publisher_id
|
||||
|
||||
def info(self, ctx, publisher_id, event_type, payload, metadata):
|
||||
|
|
|
@ -14,19 +14,14 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.messaging.utils.synchronization import \
|
||||
Synchronization
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
from watcher.common.messaging.utils import synchronization
|
||||
|
||||
|
||||
class Observable(Synchronization):
|
||||
class Observable(synchronization.Synchronization):
|
||||
def __init__(self):
|
||||
super(Observable, self).__init__()
|
||||
self.__observers = []
|
||||
self.changed = 0
|
||||
Synchronization.__init__(self)
|
||||
|
||||
def set_changed(self):
|
||||
self.changed = 1
|
||||
|
|
|
@ -54,8 +54,8 @@ class DefaultAuditHandler(base.BaseAuditHandler):
|
|||
event.data = {}
|
||||
payload = {'audit_uuid': audit_uuid,
|
||||
'audit_status': status}
|
||||
self.messaging.topic_status.publish_event(event.type.name,
|
||||
payload)
|
||||
self.messaging.status_topic_handler.publish_event(
|
||||
event.type.name, payload)
|
||||
|
||||
def update_audit_state(self, request_context, audit_uuid, state):
|
||||
LOG.debug("Update audit state: %s", state)
|
||||
|
|
|
@ -40,20 +40,20 @@ See :doc:`../architecture` for more details on this component.
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.messaging.messaging_core import MessagingCore
|
||||
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
|
||||
from watcher.common.messaging import messaging_core
|
||||
from watcher.decision_engine.messaging import audit_endpoint
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
WATCHER_DECISION_ENGINE_OPTS = [
|
||||
cfg.StrOpt('topic_control',
|
||||
cfg.StrOpt('conductor_topic',
|
||||
default='watcher.decision.control',
|
||||
help='The topic name used for'
|
||||
'control events, this topic '
|
||||
'used for rpc call '),
|
||||
cfg.StrOpt('topic_status',
|
||||
cfg.StrOpt('status_topic',
|
||||
default='watcher.decision.status',
|
||||
help='The topic name used for '
|
||||
'status events, this topic '
|
||||
|
@ -78,18 +78,18 @@ CONF.register_group(decision_engine_opt_group)
|
|||
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
|
||||
|
||||
|
||||
class DecisionEngineManager(MessagingCore):
|
||||
class DecisionEngineManager(messaging_core.MessagingCore):
|
||||
def __init__(self):
|
||||
super(DecisionEngineManager, self).__init__(
|
||||
CONF.watcher_decision_engine.publisher_id,
|
||||
CONF.watcher_decision_engine.topic_control,
|
||||
CONF.watcher_decision_engine.topic_status,
|
||||
CONF.watcher_decision_engine.conductor_topic,
|
||||
CONF.watcher_decision_engine.status_topic,
|
||||
api_version=self.API_VERSION)
|
||||
endpoint = AuditEndpoint(self,
|
||||
max_workers=CONF.watcher_decision_engine.
|
||||
max_workers)
|
||||
self.topic_control.add_endpoint(endpoint)
|
||||
endpoint = audit_endpoint.AuditEndpoint(
|
||||
self,
|
||||
max_workers=CONF.watcher_decision_engine.max_workers)
|
||||
self.conductor_topic_handler.add_endpoint(endpoint)
|
||||
|
||||
def join(self):
|
||||
self.topic_control.join()
|
||||
self.topic_status.join()
|
||||
self.conductor_topic_handler.join()
|
||||
self.status_topic_handler.join()
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as om
|
||||
|
||||
from watcher.common import exception
|
||||
from watcher.common.messaging.messaging_core import MessagingCore
|
||||
from watcher.common.messaging.notification_handler import NotificationHandler
|
||||
from watcher.common import utils
|
||||
from watcher.decision_engine.manager import decision_engine_opt_group
|
||||
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
|
||||
|
@ -40,22 +40,16 @@ class DecisionEngineAPI(MessagingCore):
|
|||
def __init__(self):
|
||||
super(DecisionEngineAPI, self).__init__(
|
||||
CONF.watcher_decision_engine.publisher_id,
|
||||
CONF.watcher_decision_engine.topic_control,
|
||||
CONF.watcher_decision_engine.topic_status,
|
||||
CONF.watcher_decision_engine.conductor_topic,
|
||||
CONF.watcher_decision_engine.status_topic,
|
||||
api_version=self.API_VERSION,
|
||||
)
|
||||
|
||||
transport = om.get_transport(CONF)
|
||||
target = om.Target(
|
||||
topic=CONF.watcher_decision_engine.topic_control,
|
||||
version=self.API_VERSION,
|
||||
)
|
||||
self.client = om.RPCClient(transport, target,
|
||||
serializer=self.serializer)
|
||||
self.handler = NotificationHandler(self.publisher_id)
|
||||
self.status_topic_handler.add_endpoint(self.handler)
|
||||
|
||||
def trigger_audit(self, context, audit_uuid=None):
|
||||
if not utils.is_uuid_like(audit_uuid):
|
||||
raise exception.InvalidUuidOrName(name=audit_uuid)
|
||||
|
||||
return self.client.call(
|
||||
return self.conductor_client.call(
|
||||
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)
|
||||
|
|
|
@ -15,9 +15,8 @@
|
|||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from mock import call
|
||||
from mock import MagicMock
|
||||
|
||||
import mock
|
||||
|
||||
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
||||
from watcher.applier.messaging.event_types import EventTypes
|
||||
|
@ -34,7 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||
self.context)
|
||||
|
||||
def test_launch_action_plan(self):
|
||||
command = DefaultActionPlanHandler(self.context, MagicMock(),
|
||||
command = DefaultActionPlanHandler(self.context, mock.MagicMock(),
|
||||
self.action_plan.uuid)
|
||||
command.execute()
|
||||
action_plan = ActionPlan.get_by_uuid(self.context,
|
||||
|
@ -42,18 +41,19 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
|
||||
|
||||
def test_trigger_audit_send_notification(self):
|
||||
messaging = MagicMock()
|
||||
messaging = mock.MagicMock()
|
||||
command = DefaultActionPlanHandler(self.context, messaging,
|
||||
self.action_plan.uuid)
|
||||
command.execute()
|
||||
|
||||
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
call_on_going = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
'action_plan_state': ap_objects.State.ONGOING,
|
||||
'action_plan__uuid': self.action_plan.uuid})
|
||||
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
call_succeeded = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
'action_plan_state': ap_objects.State.SUCCEEDED,
|
||||
'action_plan__uuid': self.action_plan.uuid})
|
||||
|
||||
calls = [call_on_going, call_succeeded]
|
||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
||||
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(
|
||||
2, messaging.status_topic_handler.publish_event.call_count)
|
||||
|
|
|
@ -15,59 +15,79 @@
|
|||
# limitations under the License.
|
||||
|
||||
|
||||
from mock import patch
|
||||
import mock
|
||||
|
||||
from watcher.common.messaging.messaging_core import MessagingCore
|
||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
||||
from watcher.common.rpc import RequestContextSerializer
|
||||
from watcher.tests.base import TestCase
|
||||
from watcher.common.messaging import messaging_core
|
||||
from watcher.common.messaging import messaging_handler
|
||||
from watcher.common import rpc
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
class TestMessagingCore(TestCase):
|
||||
class TestMessagingCore(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMessagingCore, self).setUp()
|
||||
|
||||
def test_build_topic(self):
|
||||
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||
def test_connect(self, m_handler):
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
messaging.connect()
|
||||
self.assertEqual(m_handler.call_count, 2)
|
||||
|
||||
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||
def test_disconnect(self, m_handler):
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
messaging.disconnect()
|
||||
self.assertEqual(m_handler.call_count, 2)
|
||||
|
||||
def test_build_topic_handler(self):
|
||||
topic_name = "MyTopic"
|
||||
messaging = MessagingCore("", "", "")
|
||||
messaging_handler = messaging.build_topic(topic_name)
|
||||
self.assertIsNotNone(messaging_handler)
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
handler = messaging.build_topic_handler(topic_name)
|
||||
self.assertIsNotNone(handler)
|
||||
|
||||
def test_init_messaging_core(self):
|
||||
messaging = MessagingCore("", "", "")
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
self.assertIsInstance(messaging.serializer,
|
||||
RequestContextSerializer)
|
||||
self.assertIsInstance(messaging.topic_control, MessagingHandler)
|
||||
self.assertIsInstance(messaging.topic_status, MessagingHandler)
|
||||
rpc.RequestContextSerializer)
|
||||
self.assertIsInstance(
|
||||
messaging.conductor_topic_handler,
|
||||
messaging_handler.MessagingHandler)
|
||||
self.assertIsInstance(
|
||||
messaging.status_topic_handler,
|
||||
messaging_handler.MessagingHandler)
|
||||
|
||||
@patch.object(MessagingCore, 'publish_control')
|
||||
def test_publish_control(self, mock_call):
|
||||
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||
def test_publish_control(self, m_handler_cls):
|
||||
m_handler = mock.Mock()
|
||||
m_handler_cls.return_value = m_handler
|
||||
payload = {
|
||||
"name": "value",
|
||||
}
|
||||
event = "MyEvent"
|
||||
messaging = MessagingCore("", "", "")
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
messaging.publish_control(event, payload)
|
||||
mock_call.assert_called_once_with(event, payload)
|
||||
m_handler.publish_event.assert_called_once_with(event, payload)
|
||||
|
||||
@patch.object(MessagingCore, 'publish_status')
|
||||
def test_publish_status(self, mock_call):
|
||||
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||
def test_publish_status(self, m_handler_cls):
|
||||
m_handler = mock.Mock()
|
||||
m_handler_cls.return_value = m_handler
|
||||
payload = {
|
||||
"name": "value",
|
||||
}
|
||||
event = "MyEvent"
|
||||
messaging = MessagingCore("", "", "")
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
messaging.publish_status(event, payload)
|
||||
mock_call.assert_called_once_with(event, payload)
|
||||
m_handler.publish_event.assert_called_once_with(event, payload, None)
|
||||
|
||||
@patch.object(MessagingCore, 'publish_status')
|
||||
@mock.patch.object(messaging_core.MessagingCore, 'publish_status')
|
||||
def test_response(self, mock_call):
|
||||
event = "My event"
|
||||
context = {'request_id': 12}
|
||||
message = "My Message"
|
||||
|
||||
messaging = MessagingCore("", "", "")
|
||||
messaging = messaging_core.MessagingCore("", "", "")
|
||||
messaging.response(event, context, message)
|
||||
|
||||
expected_payload = {
|
||||
|
@ -76,13 +96,15 @@ class TestMessagingCore(TestCase):
|
|||
}
|
||||
mock_call.assert_called_once_with(event, expected_payload)
|
||||
|
||||
def test_messaging_build_topic(self):
|
||||
messaging = MessagingCore("pub_id", "test_topic", "does not matter")
|
||||
topic = messaging.build_topic("test_topic")
|
||||
def test_messaging_build_topic_handler(self):
|
||||
messaging = messaging_core.MessagingCore(
|
||||
"pub_id", "test_topic", "does not matter")
|
||||
topic = messaging.build_topic_handler("test_topic")
|
||||
|
||||
self.assertIsInstance(topic, MessagingHandler)
|
||||
self.assertIsInstance(topic, messaging_handler.MessagingHandler)
|
||||
self.assertEqual(messaging.publisher_id, "pub_id")
|
||||
self.assertEqual(topic.publisher_id, "pub_id")
|
||||
|
||||
self.assertEqual(messaging.topic_control.topic_watcher, "test_topic")
|
||||
self.assertEqual(topic.topic_watcher, "test_topic")
|
||||
self.assertEqual(
|
||||
messaging.conductor_topic_handler.topic_name, "test_topic")
|
||||
self.assertEqual(topic.topic_name, "test_topic")
|
||||
|
|
|
@ -14,17 +14,16 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
||||
from watcher.tests.base import TestCase
|
||||
from watcher.common.messaging import messaging_handler
|
||||
from watcher.tests import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestMessagingHandler(TestCase):
|
||||
class TestMessagingHandler(base.TestCase):
|
||||
|
||||
PUBLISHER_ID = 'TEST_API'
|
||||
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
|
||||
|
@ -35,20 +34,20 @@ class TestMessagingHandler(TestCase):
|
|||
super(TestMessagingHandler, self).setUp()
|
||||
CONF.set_default('host', 'fake-fqdn')
|
||||
|
||||
@patch.object(messaging, "get_rpc_server")
|
||||
@patch.object(messaging, "Target")
|
||||
@mock.patch.object(messaging, "get_rpc_server")
|
||||
@mock.patch.object(messaging, "Target")
|
||||
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
|
||||
m_target = Mock()
|
||||
m_target = mock.Mock()
|
||||
m_target_cls.return_value = m_target
|
||||
messaging_handler = MessagingHandler(
|
||||
handler = messaging_handler.MessagingHandler(
|
||||
publisher_id=self.PUBLISHER_ID,
|
||||
topic_watcher=self.TOPIC_WATCHER,
|
||||
topic_name=self.TOPIC_WATCHER,
|
||||
endpoint=self.ENDPOINT,
|
||||
version=self.VERSION,
|
||||
serializer=None,
|
||||
)
|
||||
|
||||
messaging_handler.run()
|
||||
handler.run()
|
||||
|
||||
m_target_cls.assert_called_once_with(
|
||||
server="fake-fqdn",
|
||||
|
@ -56,23 +55,23 @@ class TestMessagingHandler(TestCase):
|
|||
version="1.0",
|
||||
)
|
||||
m_get_rpc_server.assert_called_once_with(
|
||||
messaging_handler.transport,
|
||||
handler.transport,
|
||||
m_target,
|
||||
[self.ENDPOINT],
|
||||
serializer=None,
|
||||
)
|
||||
|
||||
def test_messaging_handler_remove_endpoint(self):
|
||||
messaging_handler = MessagingHandler(
|
||||
handler = messaging_handler.MessagingHandler(
|
||||
publisher_id=self.PUBLISHER_ID,
|
||||
topic_watcher=self.TOPIC_WATCHER,
|
||||
topic_name=self.TOPIC_WATCHER,
|
||||
endpoint=self.ENDPOINT,
|
||||
version=self.VERSION,
|
||||
serializer=None,
|
||||
)
|
||||
|
||||
self.assertEqual(messaging_handler.endpoints, [self.ENDPOINT])
|
||||
self.assertEqual(handler.endpoints, [self.ENDPOINT])
|
||||
|
||||
messaging_handler.remove_endpoint(self.ENDPOINT)
|
||||
handler.remove_endpoint(self.ENDPOINT)
|
||||
|
||||
self.assertEqual(messaging_handler.endpoints, [])
|
||||
self.assertEqual(handler.endpoints, [])
|
||||
|
|
|
@ -63,5 +63,6 @@ class TestDefaultAuditHandler(base.DbTestCase):
|
|||
'audit_uuid': self.audit.uuid})
|
||||
|
||||
calls = [call_on_going, call_succeeded]
|
||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
||||
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(
|
||||
2, messaging.status_topic_handler.publish_event.call_count)
|
||||
|
|
|
@ -79,7 +79,7 @@ class MyObj2(object):
|
|||
pass
|
||||
|
||||
|
||||
class TestSubclassedObject(MyObj):
|
||||
class DummySubclassedObject(MyObj):
|
||||
fields = {'new_field': str}
|
||||
|
||||
|
||||
|
@ -438,13 +438,13 @@ class _TestObject(object):
|
|||
base_fields = base.WatcherObject.fields.keys()
|
||||
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
|
||||
myobj3_fields = ['new_field']
|
||||
self.assertTrue(issubclass(TestSubclassedObject, MyObj))
|
||||
self.assertTrue(issubclass(DummySubclassedObject, MyObj))
|
||||
self.assertEqual(len(myobj_fields), len(MyObj.fields))
|
||||
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
|
||||
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
|
||||
len(TestSubclassedObject.fields))
|
||||
len(DummySubclassedObject.fields))
|
||||
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
|
||||
set(TestSubclassedObject.fields.keys()))
|
||||
set(DummySubclassedObject.fields.keys()))
|
||||
|
||||
def test_get_changes(self):
|
||||
obj = MyObj(self.context)
|
||||
|
|
Loading…
Reference in New Issue