# Copyright (C) 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # Following code fixes 2 issues with kafka-python and # The current release of eventlet (0.19.0) does not actually remove # select.poll [1]. Because of kafka-python.selectors34 selects # PollSelector instead of SelectSelector [2]. PollSelector relies on # select.poll, which does not work when eventlet/greenlet is used. This # bug in evenlet is fixed in the master branch [3], but there's no # release of eventlet that includes this fix at this point. import json import threading import kafka from kafka.client_async import selectors import kafka.errors from oslo_config import cfg from oslo_log import log as logging from oslo_utils import eventletutils import tenacity from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import kafka_options from oslo_messaging._drivers import pool as driver_pool from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils if eventletutils.is_monkey_patched('select'): # monkeypatch the vendored SelectSelector._select like eventlet does # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32 from eventlet.green import select selectors.SelectSelector._select = staticmethod(select.select) # Force to use the select selectors KAFKA_SELECTOR = selectors.SelectSelector else: KAFKA_SELECTOR = selectors.DefaultSelector LOG = logging.getLogger(__name__) def unpack_message(msg): context = {} message = None try: if msg: msg = json.loads(msg) message = driver_common.deserialize_msg(msg) if 'context' in message: context = message['context'] del message['context'] except ValueError as e: LOG.info("Invalid format of consumed message: %s" % e) except Exception: LOG.warning(_LW("Exception during message unpacking")) return message, context def pack_message(ctxt, msg): """Pack context into msg.""" if isinstance(ctxt, dict): context_d = ctxt else: context_d = ctxt.to_dict() msg['context'] = context_d msg = driver_common.serialize_msg(msg) return msg def target_to_topic(target, priority=None): """Convert target into topic string :param target: Message destination target :type target: oslo_messaging.Target :param priority: Notification priority :type priority: string """ if not priority: return target.topic return target.topic + '.' + priority def retry_on_retriable_kafka_error(exc): return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable) def with_reconnect(retries=None): def decorator(func): @tenacity.retry( retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error), wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_attempt(retries), reraise=True ) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator class Producer(object): _producer = None _servers = None _lock = threading.Lock() @staticmethod @with_reconnect() def connect(servers, **kwargs): return kafka.KafkaProducer( bootstrap_servers=servers, selector=KAFKA_SELECTOR, **kwargs) @classmethod def producer(cls, servers, **kwargs): with cls._lock: if not cls._producer or cls._servers != servers: cls._servers = servers cls._producer = cls.connect(servers, **kwargs) return cls._producer @classmethod def cleanup(cls): with cls._lock: if cls._producer: cls._producer.close() cls._producer = None class Connection(object): def __init__(self, conf, url, purpose): self.client = None driver_conf = conf.oslo_messaging_kafka self.batch_size = driver_conf.producer_batch_size self.linger_ms = driver_conf.producer_batch_timeout * 1000 self.conf = conf self.producer = None self.consumer = None self.consumer_timeout = float(driver_conf.kafka_consumer_timeout) self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes self.group_id = driver_conf.consumer_group self.url = url self._parse_url() # TODO(Support for manual/auto_commit functionality) # When auto_commit is False, consumer can manually notify # the completion of the subscription. # Currently we don't support for non auto commit option self.auto_commit = True self._consume_loop_stopped = False def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka self.hostaddrs = [] for host in self.url.hosts: if host.hostname: self.hostaddrs.append("%s:%s" % ( host.hostname, host.port or driver_conf.kafka_default_port)) if not self.hostaddrs: self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, driver_conf.kafka_default_port)) def notify_send(self, topic, ctxt, msg, retry): """Send messages to Kafka broker. :param topic: String of the topic :param ctxt: context for the messages :param msg: messages for publishing :param retry: the number of retry """ message = pack_message(ctxt, msg) self._ensure_connection() self._send_and_retry(message, topic, retry) def _send_and_retry(self, message, topic, retry): if not isinstance(message, str): message = jsonutils.dumps(message) retry = retry if retry >= 0 else None @with_reconnect(retries=retry) def _send(topic, message): self.producer.send(topic, message) try: _send(topic, message) except Exception: Producer.cleanup() LOG.exception(_LE("Failed to send message")) @with_reconnect() def _poll_messages(self, timeout): return self.consumer.poll(timeout) def consume(self, timeout=None): """Receive up to 'max_fetch_messages' messages. :param timeout: poll timeout in seconds """ if self._consume_loop_stopped: return None timeout = timeout if timeout >= 0 else self.consumer_timeout try: messages = self._poll_messages(timeout) except kafka.errors.ConsumerTimeout as e: raise driver_common.Timeout(e.message) except Exception: LOG.exception(_LE("Failed to consume messages")) messages = None return messages def stop_consuming(self): self._consume_loop_stopped = True def reset(self): """Reset a connection so it can be used again.""" pass def close(self): if self.producer: self.producer.close() self.producer = None if self.consumer: self.consumer.close() self.consumer = None def commit(self): """Commit is used by subscribers belonging to the same group. After subscribing messages, commit is called to prevent the other subscribers which belong to the same group from re-subscribing the same messages. Currently self.auto_commit option is always True, so we don't need to call this function. """ self.consumer.commit() def _ensure_connection(self): try: self.producer = Producer.producer(self.hostaddrs, linger_ms=self.linger_ms, batch_size=self.batch_size) except kafka.errors.KafkaError as e: LOG.exception(_LE("KafkaProducer could not be initialized: %s"), e) raise @with_reconnect() def declare_topic_consumer(self, topics, group=None): self.consumer = kafka.KafkaConsumer( *topics, group_id=(group or self.group_id), bootstrap_servers=self.hostaddrs, max_partition_fetch_bytes=self.max_fetch_bytes, selector=KAFKA_SELECTOR ) class OsloKafkaMessage(base.RpcIncomingMessage): def __init__(self, ctxt, message): super(OsloKafkaMessage, self).__init__(ctxt, message) def requeue(self): LOG.warning(_LW("requeue is not supported")) def reply(self, reply=None, failure=None): LOG.warning(_LW("reply is not supported")) class KafkaListener(base.PollStyleListener): def __init__(self, conn): super(KafkaListener, self).__init__() self._stopped = threading.Event() self.conn = conn self.incoming_queue = [] @base.batch_poll_helper def poll(self, timeout=None): # TODO(sileht): use batch capability of kafka while not self._stopped.is_set(): if self.incoming_queue: return self.incoming_queue.pop(0) try: messages = self.conn.consume(timeout=timeout) if messages: self._put_messages_to_queue(messages) except driver_common.Timeout: return None def _put_messages_to_queue(self, messages): for topic, records in messages.items(): if records: for record in records: message, context = unpack_message(record.value) if message: self.incoming_queue.append( OsloKafkaMessage(ctxt=context, message=message)) def stop(self): self._stopped.set() self.conn.stop_consuming() def cleanup(self): self.conn.close() def commit(self): # TODO(Support for manually/auto commit functionality) # It's better to allow users to commit manually and support for # self.auto_commit = False option. For now, this commit function # is meaningless since user couldn't call this function and # auto_commit option is always True. self.conn.commit() class KafkaDriver(base.BaseDriver): """Note: Current implementation of this driver is experimental. We will have functional and/or integrated testing enabled for this driver. """ def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): opt_group = cfg.OptGroup(name='oslo_messaging_kafka', title='Kafka driver options') conf.register_group(opt_group) conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group) super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) # the pool configuration properties max_size = self.conf.oslo_messaging_kafka.pool_size min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl self.connection_pool = driver_pool.ConnectionPool( self.conf, max_size, min_size, ttl, self._url, Connection) self.listeners = [] def cleanup(self): for c in self.listeners: c.close() self.listeners = [] def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): raise NotImplementedError( 'The RPC implementation for Kafka is not implemented') def send_notification(self, target, ctxt, message, version, retry=None): """Send notification to Kafka brokers :param target: Message destination target :type target: oslo_messaging.Target :param ctxt: Message context :type ctxt: dict :param message: Message payload to pass :type message: dict :param version: Messaging API version (currently not used) :type version: str :param retry: an optional default kafka consumer retries configuration None means to retry forever 0 means no retry N means N retries :type retry: int """ with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn: conn.notify_send(target_to_topic(target), ctxt, message, retry) def listen(self, target, batch_size, batch_timeout): raise NotImplementedError( 'The RPC implementation for Kafka is not implemented') def listen_for_notifications(self, targets_and_priorities, pool, batch_size, batch_timeout): """Listen to a specified list of targets on Kafka brokers :param targets_and_priorities: List of pairs (target, priority) priority is not used for kafka driver target.exchange_target.topic is used as a kafka topic :type targets_and_priorities: list :param pool: consumer group of Kafka consumers :type pool: string """ conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN) topics = set() for target, priority in targets_and_priorities: topics.add(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) listener = KafkaListener(conn) return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout) def _get_connection(self, purpose): return driver_common.ConnectionContext(self.connection_pool, purpose)