oslo.messaging/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py

129 lines
5.4 KiB
Python

# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import uuid
import six
from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class SubConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
super(SubConsumer, self).__init__(conf, poller, server)
self.matchmaker = SubscriptionMatchmakerWrapper(conf,
server.matchmaker)
self.target = server.target
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
immediate=False,
identity=self._generate_identity())
self.sockets.append(self.socket)
self.host = self.socket.handle.identity
self._subscribe_to_topic()
self._receive_request_versions = \
zmq_version.get_method_versions(self, 'receive_request')
self.connection_updater = SubscriberConnectionUpdater(
conf, self.matchmaker, self.socket)
self.poller.register(self.socket, self.receive_request)
LOG.info(_LI("[%s] Run SUB consumer"), self.host)
def _generate_identity(self):
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
zmq_address.target_to_subscribe_filter(self.target) + \
six.b('/' + str(uuid.uuid4()))
def _subscribe_to_topic(self):
topic_filter = zmq_address.target_to_subscribe_filter(self.target)
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
{"host": self.host, "filter": topic_filter})
def _get_receive_request_version(self, version):
receive_request_version = self._receive_request_versions.get(version)
if receive_request_version is None:
raise zmq_version.UnsupportedMessageVersionError(version)
return receive_request_version
def _receive_request_v_1_0(self, topic_filter, socket):
message_type = int(socket.recv())
assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
message_id = socket.recv()
context, message = socket.recv_loaded()
LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s "
"(v%(msg_version)s)",
{'host': self.host,
'filter': topic_filter,
'msg_id': message_id,
'msg_version': '1.0'})
return context, message
def receive_request(self, socket):
try:
topic_filter = socket.recv()
message_version = socket.recv_string()
receive_request_version = \
self._get_receive_request_version(message_version)
context, message = receive_request_version(topic_filter, socket)
return zmq_incoming_message.ZmqIncomingMessage(context, message)
except (zmq.ZMQError, AssertionError, ValueError,
zmq_version.UnsupportedMessageVersionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
# NOTE(gdavoian): drop the left parts of a broken message
if socket.getsockopt(zmq.RCVMORE):
socket.recv_multipart()
def cleanup(self):
LOG.info(_LI("[%s] Destroy SUB consumer"), self.host)
self.connection_updater.cleanup()
super(SubConsumer, self).cleanup()
class SubscriptionMatchmakerWrapper(object):
def __init__(self, conf, matchmaker):
self.conf = conf
self.matchmaker = matchmaker
def get_publishers(self):
conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on
LOG.debug("Publishers taken from configuration %s", conf_publishers)
if conf_publishers:
return [(publisher, None) for publisher in conf_publishers]
return self.matchmaker.get_publishers()
class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):
def _update_connection(self):
publishers = self.matchmaker.get_publishers()
for publisher_address, router_address in publishers:
self.socket.connect_to_host(publisher_address)
LOG.debug("[%s] SUB consumer connected to publishers %s",
self.socket.handle.identity, publishers)