Define self.client in MessagingCore

Currently self.client is referenced within MessagingCore,
but no definition is made in its constructor. Additionally
self.client is defined in children classes of MessagingCore.
This patchset defines self.client in the constructor of
MessagingCore and removes the redefinition in its children.

-self.client lazily loaded

Co-Authored-By: v-francoise <Vincent.FRANCOISE@b-com.com>
Change-Id: I14525a175bf1ebde3d2636024ad2f2219c79d6e1
Closes-Bug: #1521636
This commit is contained in:
Darren Shaw 2015-12-05 18:53:44 -06:00 committed by Vincent Françoise
parent 7406a1e713
commit 2f0c1c12cf
16 changed files with 195 additions and 142 deletions

View File

@ -43,8 +43,8 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
ev.data = {}
payload = {'action_plan__uuid': uuid,
'action_plan_state': state}
self.applier_manager.topic_status.publish_event(ev.type.name,
payload)
self.applier_manager.status_topic_handler.publish_event(
ev.type.name, payload)
def execute(self):
try:

View File

@ -34,12 +34,12 @@ APPLIER_MANAGER_OPTS = [
min=1,
required=True,
help='Number of workers for applier, default value is 1.'),
cfg.StrOpt('topic_control',
cfg.StrOpt('conductor_topic',
default='watcher.applier.control',
help='The topic name used for'
'control events, this topic '
'used for rpc call '),
cfg.StrOpt('topic_status',
cfg.StrOpt('status_topic',
default='watcher.applier.status',
help='The topic name used for '
'status events, this topic '
@ -67,12 +67,13 @@ class ApplierManager(messaging_core.MessagingCore):
def __init__(self):
super(ApplierManager, self).__init__(
CONF.watcher_applier.publisher_id,
CONF.watcher_applier.topic_control,
CONF.watcher_applier.topic_status,
CONF.watcher_applier.conductor_topic,
CONF.watcher_applier.status_topic,
api_version=self.API_VERSION,
)
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
self.conductor_topic_handler.add_endpoint(
trigger.TriggerActionPlan(self))
def join(self):
self.topic_control.join()
self.topic_status.join()
self.conductor_topic_handler.join()
self.status_topic_handler.join()

View File

@ -39,17 +39,17 @@ class ApplierAPI(messaging_core.MessagingCore):
def __init__(self):
super(ApplierAPI, self).__init__(
CONF.watcher_applier.publisher_id,
CONF.watcher_applier.topic_control,
CONF.watcher_applier.topic_status,
CONF.watcher_applier.conductor_topic,
CONF.watcher_applier.status_topic,
api_version=self.API_VERSION,
)
self.handler = notification.NotificationHandler(self.publisher_id)
self.handler.register_observer(self)
self.topic_status.add_endpoint(self.handler)
self.status_topic_handler.add_endpoint(self.handler)
transport = om.get_transport(CONF)
target = om.Target(
topic=CONF.watcher_applier.topic_control,
topic=CONF.watcher_applier.conductor_topic,
version=self.API_VERSION,
)

View File

@ -62,8 +62,8 @@ class BaseWorkFlowEngine(object):
ev.data = {}
payload = {'action_uuid': action.uuid,
'action_state': state}
self.applier_manager.topic_status.publish_event(ev.type.name,
payload)
self.applier_manager.status_topic_handler.publish_event(
ev.type.name, payload)
@abc.abstractmethod
def execute(self, actions):

View File

@ -16,58 +16,100 @@
from oslo_config import cfg
from oslo_log import log
from watcher.common.messaging.events.event_dispatcher import \
EventDispatcher
from watcher.common.messaging.messaging_handler import \
MessagingHandler
from watcher.common.rpc import RequestContextSerializer
import oslo_messaging as om
from watcher.objects.base import WatcherObjectSerializer
from watcher.common.messaging.events import event_dispatcher as dispatcher
from watcher.common.messaging import messaging_handler
from watcher.common import rpc
from watcher.objects import base
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class MessagingCore(EventDispatcher):
class MessagingCore(dispatcher.EventDispatcher):
API_VERSION = '1.0'
def __init__(self, publisher_id, topic_control, topic_status,
def __init__(self, publisher_id, conductor_topic, status_topic,
api_version=API_VERSION):
super(MessagingCore, self).__init__()
self.serializer = RequestContextSerializer(WatcherObjectSerializer())
self.serializer = rpc.RequestContextSerializer(
base.WatcherObjectSerializer())
self.publisher_id = publisher_id
self.api_version = api_version
self.topic_control = self.build_topic(topic_control)
self.topic_status = self.build_topic(topic_status)
def build_topic(self, topic_name):
return MessagingHandler(self.publisher_id, topic_name, self,
self.api_version, self.serializer)
self.conductor_topic = conductor_topic
self.status_topic = status_topic
self.conductor_topic_handler = self.build_topic_handler(
conductor_topic)
self.status_topic_handler = self.build_topic_handler(status_topic)
self._conductor_client = None
self._status_client = None
@property
def conductor_client(self):
if self._conductor_client is None:
transport = om.get_transport(CONF)
target = om.Target(
topic=self.conductor_topic,
version=self.API_VERSION,
)
self._conductor_client = om.RPCClient(
transport, target, serializer=self.serializer)
return self._conductor_client
@conductor_client.setter
def conductor_client(self, c):
self.conductor_client = c
@property
def status_client(self):
if self._status_client is None:
transport = om.get_transport(CONF)
target = om.Target(
topic=self.status_topic,
version=self.API_VERSION,
)
self._status_client = om.RPCClient(
transport, target, serializer=self.serializer)
return self._status_client
@status_client.setter
def status_client(self, c):
self.status_client = c
def build_topic_handler(self, topic_name):
return messaging_handler.MessagingHandler(
self.publisher_id, topic_name, self,
self.api_version, self.serializer)
def connect(self):
LOG.debug("Connecting to '%s' (%s)",
CONF.transport_url, CONF.rpc_backend)
self.topic_control.start()
self.topic_status.start()
self.conductor_topic_handler.start()
self.status_topic_handler.start()
def disconnect(self):
LOG.debug("Disconnecting from '%s' (%s)",
CONF.transport_url, CONF.rpc_backend)
self.topic_control.stop()
self.topic_status.stop()
self.conductor_topic_handler.stop()
self.status_topic_handler.stop()
def publish_control(self, event, payload):
return self.topic_control.publish_event(event, payload)
return self.conductor_topic_handler.publish_event(event, payload)
def publish_status(self, event, payload, request_id=None):
return self.topic_status.publish_event(event, payload, request_id)
return self.status_topic_handler.publish_event(
event, payload, request_id)
def get_version(self):
return self.api_version
def check_api_version(self, context):
api_manager_version = self.client.call(
api_manager_version = self.conductor_client.call(
context.to_dict(), 'check_api_version',
api_version=self.api_version)
return api_manager_version

View File

@ -38,11 +38,11 @@ CONF = cfg.CONF
class MessagingHandler(threading.Thread):
def __init__(self, publisher_id, topic_watcher, endpoint, version,
def __init__(self, publisher_id, topic_name, endpoint, version,
serializer=None):
super(MessagingHandler, self).__init__()
self.publisher_id = publisher_id
self.topic_watcher = topic_watcher
self.topic_name = topic_name
self.__endpoints = []
self.__serializer = serializer
self.__version = version
@ -72,7 +72,7 @@ class MessagingHandler(threading.Thread):
return om.Notifier(
self.__transport,
publisher_id=self.publisher_id,
topic=self.topic_watcher,
topic=self.topic_name,
serializer=serializer
)
@ -87,7 +87,7 @@ class MessagingHandler(threading.Thread):
self.__notifier = self.build_notifier()
if len(self.__endpoints):
target = om.Target(
topic=self.topic_watcher,
topic=self.topic_name,
# For compatibility, we can override it with 'host' opt
server=CONF.host or socket.getfqdn(),
version=self.__version,
@ -101,7 +101,7 @@ class MessagingHandler(threading.Thread):
LOG.error(_LE("Messaging configuration error"))
def run(self):
LOG.debug("configure MessagingHandler for %s" % self.topic_watcher)
LOG.debug("configure MessagingHandler for %s" % self.topic_name)
self._configure()
if len(self.__endpoints) > 0:
LOG.debug("Starting up server")

View File

@ -18,17 +18,16 @@ import eventlet
from oslo_log import log
import oslo_messaging as messaging
from watcher.common.messaging.utils.observable import \
Observable
from watcher.common.messaging.utils import observable
eventlet.monkey_patch()
LOG = log.getLogger(__name__)
class NotificationHandler(Observable):
class NotificationHandler(observable.Observable):
def __init__(self, publisher_id):
Observable.__init__(self)
super(NotificationHandler, self).__init__()
self.publisher_id = publisher_id
def info(self, ctx, publisher_id, event_type, payload, metadata):

View File

@ -14,19 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
from watcher.common.messaging.utils.synchronization import \
Synchronization
LOG = log.getLogger(__name__)
from watcher.common.messaging.utils import synchronization
class Observable(Synchronization):
class Observable(synchronization.Synchronization):
def __init__(self):
super(Observable, self).__init__()
self.__observers = []
self.changed = 0
Synchronization.__init__(self)
def set_changed(self):
self.changed = 1

View File

@ -54,8 +54,8 @@ class DefaultAuditHandler(base.BaseAuditHandler):
event.data = {}
payload = {'audit_uuid': audit_uuid,
'audit_status': status}
self.messaging.topic_status.publish_event(event.type.name,
payload)
self.messaging.status_topic_handler.publish_event(
event.type.name, payload)
def update_audit_state(self, request_context, audit_uuid, state):
LOG.debug("Update audit state: %s", state)

View File

@ -40,20 +40,20 @@ See :doc:`../architecture` for more details on this component.
from oslo_config import cfg
from oslo_log import log
from watcher.common.messaging.messaging_core import MessagingCore
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
from watcher.common.messaging import messaging_core
from watcher.decision_engine.messaging import audit_endpoint
LOG = log.getLogger(__name__)
CONF = cfg.CONF
WATCHER_DECISION_ENGINE_OPTS = [
cfg.StrOpt('topic_control',
cfg.StrOpt('conductor_topic',
default='watcher.decision.control',
help='The topic name used for'
'control events, this topic '
'used for rpc call '),
cfg.StrOpt('topic_status',
cfg.StrOpt('status_topic',
default='watcher.decision.status',
help='The topic name used for '
'status events, this topic '
@ -78,18 +78,18 @@ CONF.register_group(decision_engine_opt_group)
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
class DecisionEngineManager(MessagingCore):
class DecisionEngineManager(messaging_core.MessagingCore):
def __init__(self):
super(DecisionEngineManager, self).__init__(
CONF.watcher_decision_engine.publisher_id,
CONF.watcher_decision_engine.topic_control,
CONF.watcher_decision_engine.topic_status,
CONF.watcher_decision_engine.conductor_topic,
CONF.watcher_decision_engine.status_topic,
api_version=self.API_VERSION)
endpoint = AuditEndpoint(self,
max_workers=CONF.watcher_decision_engine.
max_workers)
self.topic_control.add_endpoint(endpoint)
endpoint = audit_endpoint.AuditEndpoint(
self,
max_workers=CONF.watcher_decision_engine.max_workers)
self.conductor_topic_handler.add_endpoint(endpoint)
def join(self):
self.topic_control.join()
self.topic_status.join()
self.conductor_topic_handler.join()
self.status_topic_handler.join()

View File

@ -19,10 +19,10 @@
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as om
from watcher.common import exception
from watcher.common.messaging.messaging_core import MessagingCore
from watcher.common.messaging.notification_handler import NotificationHandler
from watcher.common import utils
from watcher.decision_engine.manager import decision_engine_opt_group
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
@ -40,22 +40,16 @@ class DecisionEngineAPI(MessagingCore):
def __init__(self):
super(DecisionEngineAPI, self).__init__(
CONF.watcher_decision_engine.publisher_id,
CONF.watcher_decision_engine.topic_control,
CONF.watcher_decision_engine.topic_status,
CONF.watcher_decision_engine.conductor_topic,
CONF.watcher_decision_engine.status_topic,
api_version=self.API_VERSION,
)
transport = om.get_transport(CONF)
target = om.Target(
topic=CONF.watcher_decision_engine.topic_control,
version=self.API_VERSION,
)
self.client = om.RPCClient(transport, target,
serializer=self.serializer)
self.handler = NotificationHandler(self.publisher_id)
self.status_topic_handler.add_endpoint(self.handler)
def trigger_audit(self, context, audit_uuid=None):
if not utils.is_uuid_like(audit_uuid):
raise exception.InvalidUuidOrName(name=audit_uuid)
return self.client.call(
return self.conductor_client.call(
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)

View File

@ -15,9 +15,8 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from mock import call
from mock import MagicMock
import mock
from watcher.applier.action_plan.default import DefaultActionPlanHandler
from watcher.applier.messaging.event_types import EventTypes
@ -34,7 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
self.context)
def test_launch_action_plan(self):
command = DefaultActionPlanHandler(self.context, MagicMock(),
command = DefaultActionPlanHandler(self.context, mock.MagicMock(),
self.action_plan.uuid)
command.execute()
action_plan = ActionPlan.get_by_uuid(self.context,
@ -42,18 +41,19 @@ class TestDefaultActionPlanHandler(DbTestCase):
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
def test_trigger_audit_send_notification(self):
messaging = MagicMock()
messaging = mock.MagicMock()
command = DefaultActionPlanHandler(self.context, messaging,
self.action_plan.uuid)
command.execute()
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
call_on_going = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_state': ap_objects.State.ONGOING,
'action_plan__uuid': self.action_plan.uuid})
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
call_succeeded = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_state': ap_objects.State.SUCCEEDED,
'action_plan__uuid': self.action_plan.uuid})
calls = [call_on_going, call_succeeded]
messaging.topic_status.publish_event.assert_has_calls(calls)
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
self.assertEqual(
2, messaging.status_topic_handler.publish_event.call_count)

View File

@ -15,59 +15,79 @@
# limitations under the License.
from mock import patch
import mock
from watcher.common.messaging.messaging_core import MessagingCore
from watcher.common.messaging.messaging_handler import MessagingHandler
from watcher.common.rpc import RequestContextSerializer
from watcher.tests.base import TestCase
from watcher.common.messaging import messaging_core
from watcher.common.messaging import messaging_handler
from watcher.common import rpc
from watcher.tests import base
class TestMessagingCore(TestCase):
class TestMessagingCore(base.TestCase):
def setUp(self):
super(TestMessagingCore, self).setUp()
def test_build_topic(self):
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_connect(self, m_handler):
messaging = messaging_core.MessagingCore("", "", "")
messaging.connect()
self.assertEqual(m_handler.call_count, 2)
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_disconnect(self, m_handler):
messaging = messaging_core.MessagingCore("", "", "")
messaging.disconnect()
self.assertEqual(m_handler.call_count, 2)
def test_build_topic_handler(self):
topic_name = "MyTopic"
messaging = MessagingCore("", "", "")
messaging_handler = messaging.build_topic(topic_name)
self.assertIsNotNone(messaging_handler)
messaging = messaging_core.MessagingCore("", "", "")
handler = messaging.build_topic_handler(topic_name)
self.assertIsNotNone(handler)
def test_init_messaging_core(self):
messaging = MessagingCore("", "", "")
messaging = messaging_core.MessagingCore("", "", "")
self.assertIsInstance(messaging.serializer,
RequestContextSerializer)
self.assertIsInstance(messaging.topic_control, MessagingHandler)
self.assertIsInstance(messaging.topic_status, MessagingHandler)
rpc.RequestContextSerializer)
self.assertIsInstance(
messaging.conductor_topic_handler,
messaging_handler.MessagingHandler)
self.assertIsInstance(
messaging.status_topic_handler,
messaging_handler.MessagingHandler)
@patch.object(MessagingCore, 'publish_control')
def test_publish_control(self, mock_call):
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_publish_control(self, m_handler_cls):
m_handler = mock.Mock()
m_handler_cls.return_value = m_handler
payload = {
"name": "value",
}
event = "MyEvent"
messaging = MessagingCore("", "", "")
messaging = messaging_core.MessagingCore("", "", "")
messaging.publish_control(event, payload)
mock_call.assert_called_once_with(event, payload)
m_handler.publish_event.assert_called_once_with(event, payload)
@patch.object(MessagingCore, 'publish_status')
def test_publish_status(self, mock_call):
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_publish_status(self, m_handler_cls):
m_handler = mock.Mock()
m_handler_cls.return_value = m_handler
payload = {
"name": "value",
}
event = "MyEvent"
messaging = MessagingCore("", "", "")
messaging = messaging_core.MessagingCore("", "", "")
messaging.publish_status(event, payload)
mock_call.assert_called_once_with(event, payload)
m_handler.publish_event.assert_called_once_with(event, payload, None)
@patch.object(MessagingCore, 'publish_status')
@mock.patch.object(messaging_core.MessagingCore, 'publish_status')
def test_response(self, mock_call):
event = "My event"
context = {'request_id': 12}
message = "My Message"
messaging = MessagingCore("", "", "")
messaging = messaging_core.MessagingCore("", "", "")
messaging.response(event, context, message)
expected_payload = {
@ -76,13 +96,15 @@ class TestMessagingCore(TestCase):
}
mock_call.assert_called_once_with(event, expected_payload)
def test_messaging_build_topic(self):
messaging = MessagingCore("pub_id", "test_topic", "does not matter")
topic = messaging.build_topic("test_topic")
def test_messaging_build_topic_handler(self):
messaging = messaging_core.MessagingCore(
"pub_id", "test_topic", "does not matter")
topic = messaging.build_topic_handler("test_topic")
self.assertIsInstance(topic, MessagingHandler)
self.assertIsInstance(topic, messaging_handler.MessagingHandler)
self.assertEqual(messaging.publisher_id, "pub_id")
self.assertEqual(topic.publisher_id, "pub_id")
self.assertEqual(messaging.topic_control.topic_watcher, "test_topic")
self.assertEqual(topic.topic_watcher, "test_topic")
self.assertEqual(
messaging.conductor_topic_handler.topic_name, "test_topic")
self.assertEqual(topic.topic_name, "test_topic")

View File

@ -14,17 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from mock import patch
import mock
from oslo_config import cfg
import oslo_messaging as messaging
from watcher.common.messaging.messaging_handler import MessagingHandler
from watcher.tests.base import TestCase
from watcher.common.messaging import messaging_handler
from watcher.tests import base
CONF = cfg.CONF
class TestMessagingHandler(TestCase):
class TestMessagingHandler(base.TestCase):
PUBLISHER_ID = 'TEST_API'
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
@ -35,20 +34,20 @@ class TestMessagingHandler(TestCase):
super(TestMessagingHandler, self).setUp()
CONF.set_default('host', 'fake-fqdn')
@patch.object(messaging, "get_rpc_server")
@patch.object(messaging, "Target")
@mock.patch.object(messaging, "get_rpc_server")
@mock.patch.object(messaging, "Target")
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
m_target = Mock()
m_target = mock.Mock()
m_target_cls.return_value = m_target
messaging_handler = MessagingHandler(
handler = messaging_handler.MessagingHandler(
publisher_id=self.PUBLISHER_ID,
topic_watcher=self.TOPIC_WATCHER,
topic_name=self.TOPIC_WATCHER,
endpoint=self.ENDPOINT,
version=self.VERSION,
serializer=None,
)
messaging_handler.run()
handler.run()
m_target_cls.assert_called_once_with(
server="fake-fqdn",
@ -56,23 +55,23 @@ class TestMessagingHandler(TestCase):
version="1.0",
)
m_get_rpc_server.assert_called_once_with(
messaging_handler.transport,
handler.transport,
m_target,
[self.ENDPOINT],
serializer=None,
)
def test_messaging_handler_remove_endpoint(self):
messaging_handler = MessagingHandler(
handler = messaging_handler.MessagingHandler(
publisher_id=self.PUBLISHER_ID,
topic_watcher=self.TOPIC_WATCHER,
topic_name=self.TOPIC_WATCHER,
endpoint=self.ENDPOINT,
version=self.VERSION,
serializer=None,
)
self.assertEqual(messaging_handler.endpoints, [self.ENDPOINT])
self.assertEqual(handler.endpoints, [self.ENDPOINT])
messaging_handler.remove_endpoint(self.ENDPOINT)
handler.remove_endpoint(self.ENDPOINT)
self.assertEqual(messaging_handler.endpoints, [])
self.assertEqual(handler.endpoints, [])

View File

@ -63,5 +63,6 @@ class TestDefaultAuditHandler(base.DbTestCase):
'audit_uuid': self.audit.uuid})
calls = [call_on_going, call_succeeded]
messaging.topic_status.publish_event.assert_has_calls(calls)
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
self.assertEqual(
2, messaging.status_topic_handler.publish_event.call_count)

View File

@ -79,7 +79,7 @@ class MyObj2(object):
pass
class TestSubclassedObject(MyObj):
class DummySubclassedObject(MyObj):
fields = {'new_field': str}
@ -438,13 +438,13 @@ class _TestObject(object):
base_fields = base.WatcherObject.fields.keys()
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
myobj3_fields = ['new_field']
self.assertTrue(issubclass(TestSubclassedObject, MyObj))
self.assertTrue(issubclass(DummySubclassedObject, MyObj))
self.assertEqual(len(myobj_fields), len(MyObj.fields))
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
len(TestSubclassedObject.fields))
len(DummySubclassedObject.fields))
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
set(TestSubclassedObject.fields.keys()))
set(DummySubclassedObject.fields.keys()))
def test_get_changes(self):
obj = MyObj(self.context)