diff --git a/monasca_common/kafka/consumer.py b/monasca_common/kafka/consumer.py index 3c2171f2..7b9a08ce 100644 --- a/monasca_common/kafka/consumer.py +++ b/monasca_common/kafka/consumer.py @@ -18,9 +18,9 @@ import logging import threading import time -import kafka.client -import kafka.common -import kafka.consumer +import monasca_common.kafka_lib.client as kafka_client +import monasca_common.kafka_lib.common as kafka_common +import monasca_common.kafka_lib.consumer as kafka_consumer from kazoo.client import KazooClient from kazoo.recipe.partitioner import SetPartitioner @@ -89,13 +89,13 @@ class KafkaConsumer(object): self._zookeeper_url = zookeeper_url self._zookeeper_path = zookeeper_path - self._kafka = kafka.client.KafkaClient(kafka_url) + self._kafka = kafka_client.KafkaClient(kafka_url) self._consumer = self._create_kafka_consumer() def _create_kafka_consumer(self, partitions=None): # No auto-commit so that commits only happen after the message is processed. - consumer = kafka.consumer.SimpleConsumer( + consumer = kafka_consumer.SimpleConsumer( self._kafka, self._kafka_group, self._kafka_topic, @@ -144,7 +144,7 @@ class KafkaConsumer(object): if time_delta.total_seconds() > self._commit_timeout: self._commit_callback() - except kafka.common.OffsetOutOfRangeError: + except kafka_common.OffsetOutOfRangeError: log.error("Kafka OffsetOutOfRange. Jumping to head.") self._consumer.seek(0, 0) diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index ffa9700e..b454f950 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -16,8 +16,8 @@ import logging import time -import kafka.client -import kafka.producer +import monasca_common.kafka_lib.client as kafka_client +import monasca_common.kafka_lib.producer as kafka_producer log = logging.getLogger(__name__) @@ -32,11 +32,11 @@ class KafkaProducer(object): url - kafka connection details """ - self._kafka = kafka.client.KafkaClient(url) - self._producer = kafka.producer.KeyedProducer( + self._kafka = kafka_client.KafkaClient(url) + self._producer = kafka_producer.KeyedProducer( self._kafka, async=False, - req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, + req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) def publish(self, topic, messages, key=None): diff --git a/monasca_common/kafka_lib/NOTES.md b/monasca_common/kafka_lib/NOTES.md new file mode 100644 index 00000000..8fb0f474 --- /dev/null +++ b/monasca_common/kafka_lib/NOTES.md @@ -0,0 +1,32 @@ +For 0.8, we have correlation id so we can potentially interleave requests/responses + +There are a few levels of abstraction: + +* Protocol support: encode/decode the requests/responses +* Socket support: send/recieve messages +* API support: higher level APIs such as: get_topic_metadata + + +# Methods of producing + +* Round robbin (each message to the next partition) +* All-to-one (each message to one partition) +* All-to-all? (each message to every partition) +* Partitioned (run each message through a partitioning function) +** HashPartitioned +** FunctionPartition + +# Possible API + + client = KafkaClient("localhost:9092") + + producer = KafkaProducer(client, "topic") + producer.send_string("hello") + + consumer = KafkaConsumer(client, "group", "topic") + consumer.seek(10, 2) # seek to beginning (lowest offset) + consumer.commit() # commit it + for msg in consumer.iter_messages(): + print msg + + diff --git a/monasca_common/kafka_lib/__init__.py b/monasca_common/kafka_lib/__init__.py new file mode 100644 index 00000000..8dd0a282 --- /dev/null +++ b/monasca_common/kafka_lib/__init__.py @@ -0,0 +1,21 @@ +# __title__ = 'kafka' +from .version import __version__ +__author__ = 'David Arthur' +__license__ = 'Apache License 2.0' +__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' + +# from monasca_common.kafka_lib.client import KafkaClient +# from monasca_common.kafka_lib.conn import KafkaConnection +# from monasca_common.kafka_lib.protocol import ( +# create_message, create_gzip_message, create_snappy_message +# ) +# from monasca_common.kafka_lib.producer import SimpleProducer, KeyedProducer +# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner +# from monasca_common.kafka_lib.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer +# +# __all__ = [ +# 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', +# 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', +# 'MultiProcessConsumer', 'create_message', 'create_gzip_message', +# 'create_snappy_message', 'KafkaConsumer', +# ] diff --git a/monasca_common/kafka_lib/client.py b/monasca_common/kafka_lib/client.py new file mode 100644 index 00000000..f179ba2f --- /dev/null +++ b/monasca_common/kafka_lib/client.py @@ -0,0 +1,676 @@ +import collections +import copy +import functools +import logging +import select +import time + +import monasca_common.kafka_lib.common as kafka_common +from monasca_common.kafka_lib.common import (TopicAndPartition, BrokerMetadata, + ConnectionError, FailedPayloadsError, + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError, ReplicaNotAvailableError) + +from monasca_common.kafka_lib.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from monasca_common.kafka_lib.protocol import KafkaProtocol +from monasca_common.kafka_lib.util import kafka_bytestring + + +log = logging.getLogger(__name__) + + +class KafkaClient(object): + + CLIENT_ID = b'kafka-python' + + # NOTE: The timeout given to the client should always be greater than the + # one passed to SimpleConsumer.get_message(), otherwise you can get a + # socket timeout. + def __init__(self, hosts, client_id=CLIENT_ID, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + correlation_id=0): + # We need one connection to bootstrap + self.client_id = kafka_bytestring(client_id) + self.timeout = timeout + self.hosts = collect_hosts(hosts) + self.correlation_id = correlation_id + + # create connections only when we need them + self.conns = {} + self.brokers = {} # broker_id -> BrokerMetadata + self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topic_partitions = {} # topic -> partition -> PartitionMetadata + + self.load_metadata_for_topics() # bootstrap with all metadata + + + ################## + # Private API # + ################## + + def _get_conn(self, host, port): + """Get or create a connection to a broker using host and port""" + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection( + host, + port, + timeout=self.timeout + ) + + return self.conns[host_key] + + def _get_leader_for_partition(self, topic, partition): + """ + Returns the leader for a partition or None if the partition exists + but has no leader. + + UnknownTopicOrPartitionError will be raised if the topic or partition + is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader + """ + + key = TopicAndPartition(topic, partition) + + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) + + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions.get(topic, []): + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) + + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] + + def _get_coordinator_for_group(self, group): + """ + Returns the coordinator broker for a consumer group. + + ConsumerCoordinatorNotAvailableCode will be raised if the coordinator + does not currently exist for the group. + + OffsetsLoadInProgressCode is raised if the coordinator is available + but is still loading offsets from the internal topic + """ + + resp = self.send_consumer_metadata_request(group) + + # If there's a problem with finding the coordinator, raise the + # provided error + kafka_common.check_error(resp) + + # Otherwise return the BrokerMetadata + return BrokerMetadata(resp.nodeId, resp.host, resp.port) + + def _next_id(self): + """Generate a new correlation id""" + # modulo to keep w/i int32 + self.correlation_id = (self.correlation_id + 1) % 2**31 + return self.correlation_id + + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): + """ + Attempt to send a broker-agnostic request to one of the available + brokers. Keep trying until you succeed. + """ + for (host, port) in self.hosts: + requestId = self._next_id() + log.debug('Request %s: %s', requestId, payloads) + try: + conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + + conn.send(requestId, request) + response = conn.recv(requestId) + decoded = decoder_fn(response) + log.debug('Response %s: %s', requestId, decoded) + return decoded + + except Exception: + log.exception('Error sending request [%s] to server %s:%s, ' + 'trying next server', requestId, host, port) + + raise KafkaUnavailableError('All servers failed to process request') + + def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): + """ + Group a list of request payloads by topic+partition and send them to + the leader broker for that partition using the supplied encode/decode + functions + + Arguments: + + payloads: list of object-like entities with a topic (str) and + partition (int) attribute; payloads with duplicate topic-partitions + are not supported. + + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes + + Returns: + + List of response objects in the same order as the supplied payloads + """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + + # Group the requests by topic+partition + brokers_for_payloads = [] + payloads_by_broker = collections.defaultdict(list) + + responses = {} + for payload in payloads: + try: + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + payloads_by_broker[leader].append(payload) + brokers_for_payloads.append(leader) + except KafkaUnavailableError as e: + log.warning('KafkaUnavailableError attempting to send request ' + 'on topic %s partition %d', payload.topic, payload.partition) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + # For each broker, send the list of request payloads + # and collect the responses and errors + broker_failures = [] + + # For each KafkaConnection keep the real socket so that we can use + # a select to perform unblocking I/O + connections_by_socket = {} + for broker, payloads in payloads_by_broker.items(): + requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response + try: + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + conn.send(requestId, request) + + except ConnectionError as e: + broker_failures.append(broker) + log.warning('ConnectionError attempting to send request %s ' + 'to server %s: %s', requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + # No exception, try to get response + else: + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + if decoder_fn is None: + log.debug('Request %s does not expect a response ' + '(skipping conn.recv)', requestId) + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None + continue + else: + connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId) + + conn = None + while connections_by_socket: + sockets = connections_by_socket.keys() + rlist, _, _ = select.select(sockets, [], [], None) + conn, broker, requestId = connections_by_socket.pop(rlist[0]) + try: + response = conn.recv(requestId) + except ConnectionError as e: + broker_failures.append(broker) + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) + + for payload in payloads_by_broker[broker]: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) + + # Connection errors generally mean stale metadata + # although sometimes it means incorrect api request + # Unfortunately there is no good way to tell the difference + # so we'll just reset metadata on all errors to be safe + if broker_failures: + self.reset_all_metadata() + + # Return responses in the same order as provided + return [responses[tp] for tp in original_ordering] + + def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): + """ + Send a list of requests to the consumer coordinator for the group + specified using the supplied encode/decode functions. As the payloads + that use consumer-aware requests do not contain the group (e.g. + OffsetFetchRequest), all payloads must be for a single group. + + Arguments: + + group: the name of the consumer group (str) the payloads are for + payloads: list of object-like entities with topic (str) and + partition (int) attributes; payloads with duplicate + topic+partition are not supported. + + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes + + Returns: + + List of response objects in the same order as the supplied payloads + """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + + broker = self._get_coordinator_for_group(group) + + # Send the list of request payloads and collect the responses and + # errors + responses = {} + requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response + try: + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + conn.send(requestId, request) + + except ConnectionError as e: + log.warning('ConnectionError attempting to send request %s ' + 'to server %s: %s', requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + # No exception, try to get response + else: + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + if decoder_fn is None: + log.debug('Request %s does not expect a response ' + '(skipping conn.recv)', requestId) + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None + return [] + + try: + response = conn.recv(requestId) + except ConnectionError as e: + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) + + # Return responses in the same order as provided + return [responses[tp] for tp in original_ordering] + + def __repr__(self): + return '' % (self.client_id) + + def _raise_on_response_error(self, resp): + + # Response can be an unraised exception object (FailedPayloadsError) + if isinstance(resp, Exception): + raise resp + + # Or a server api error response + try: + kafka_common.check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.reset_topic_metadata(resp.topic) + raise + + # Return False if no error to enable list comprehensions + return False + + ################# + # Public API # + ################# + def close(self): + for conn in self.conns.values(): + conn.close() + + def copy(self): + """ + Create an inactive copy of the client object, suitable for passing + to a separate thread. + + Note that the copied connections are not initialized, so reinit() must + be called on the returned copy. + """ + c = copy.deepcopy(self) + for key in c.conns: + c.conns[key] = self.conns[key].copy() + return c + + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + + def reset_topic_metadata(self, *topics): + for topic in topics: + for topic_partition in list(self.topics_to_brokers.keys()): + if topic_partition.topic == topic: + del self.topics_to_brokers[topic_partition] + if topic in self.topic_partitions: + del self.topic_partitions[topic] + + def reset_all_metadata(self): + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + topic = kafka_bytestring(topic) + return ( + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 + ) + + def get_partition_ids_for_topic(self, topic): + topic = kafka_bytestring(topic) + if topic not in self.topic_partitions: + return [] + + return sorted(list(self.topic_partitions[topic])) + + @property + def topics(self): + return list(self.topic_partitions.keys()) + + def ensure_topic_exists(self, topic, timeout = 30): + start_time = time.time() + + while not self.has_metadata_for_topic(topic): + if time.time() > start_time + timeout: + raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) + try: + self.load_metadata_for_topics(topic) + except LeaderNotAvailableError: + pass + except UnknownTopicOrPartitionError: + # Server is not configured to auto-create + # retrying in this case will not help + raise + time.sleep(.5) + + def load_metadata_for_topics(self, *topics): + """ + Fetch broker and topic-partition metadata from the server, + and update internal data: + broker list, topic/partition list, and topic/parition -> broker map + + This method should be called after receiving any error + + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. + + Exceptions: + ---------- + If the broker is configured to not auto-create topics, + expect UnknownTopicOrPartitionError for topics that don't exist + + If the broker is configured to auto-create topics, + expect LeaderNotAvailableError for new topics + until partitions have been initialized. + + Exceptions *will not* be raised in a full refresh (i.e. no topic list) + In this case, error codes will be logged as errors + + Partition-level errors will also not be raised here + (a single partition w/o a leader, for example) + """ + topics = [kafka_bytestring(t) for t in topics] + + if topics: + for topic in topics: + self.reset_topic_metadata(topic) + else: + self.reset_all_metadata() + + resp = self.send_metadata_request(topics) + + log.debug('Updating broker metadata: %s', resp.brokers) + log.debug('Updating topic metadata: %s', resp.topics) + + self.brokers = dict([(broker.nodeId, broker) + for broker in resp.brokers]) + + for topic_metadata in resp.topics: + topic = topic_metadata.topic + partitions = topic_metadata.partitions + + # Errors expected for new topics + try: + kafka_common.check_error(topic_metadata) + except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: + + # Raise if the topic was passed in explicitly + if topic in topics: + raise + + # Otherwise, just log a warning + log.error('Error loading topic metadata for %s: %s', topic, type(e)) + continue + + self.topic_partitions[topic] = {} + for partition_metadata in partitions: + partition = partition_metadata.partition + leader = partition_metadata.leader + + self.topic_partitions[topic][partition] = partition_metadata + + # Populate topics_to_brokers dict + topic_part = TopicAndPartition(topic, partition) + + # Check for partition errors + try: + kafka_common.check_error(partition_metadata) + + # If No Leader, topics_to_brokers topic_partition -> None + except LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) + self.topics_to_brokers[topic_part] = None + continue + # If one of the replicas is unavailable -- ignore + # this error code is provided for admin purposes only + # we never talk to replicas, only the leader + except ReplicaNotAvailableError: + log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + + # If Known Broker, topic_partition -> BrokerMetadata + if leader in self.brokers: + self.topics_to_brokers[topic_part] = self.brokers[leader] + + # If Unknown Broker, fake BrokerMetadata so we dont lose the id + # (not sure how this could happen. server could be in bad state) + else: + self.topics_to_brokers[topic_part] = BrokerMetadata( + leader, None, None + ) + + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + + def send_consumer_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + encoder = KafkaProtocol.encode_consumer_metadata_request + decoder = KafkaProtocol.decode_consumer_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + + def send_produce_request(self, payloads=[], acks=1, timeout=1000, + fail_on_error=True, callback=None): + """ + Encode and send some ProduceRequests + + ProduceRequests will be grouped by (topic, partition) and then + sent to a specific broker. Output is a list of responses in the + same order as the list of payloads specified + + Arguments: + payloads (list of ProduceRequest): produce requests to send to kafka + ProduceRequest payloads must not contain duplicates for any + topic-partition. + acks (int, optional): how many acks the servers should receive from replica + brokers before responding to the request. If it is 0, the server + will not send any response. If it is 1, the server will wait + until the data is written to the local log before sending a + response. If it is -1, the server will wait until the message + is committed by all in-sync replicas before sending a response. + For any value > 1, the server will wait for this number of acks to + occur (but the server will never wait for more acknowledgements than + there are in-sync replicas). defaults to 1. + timeout (int, optional): maximum time in milliseconds the server can + await the receipt of the number of acks, defaults to 1000. + fail_on_error (bool, optional): raise exceptions on connection and + server response errors, defaults to True. + callback (function, optional): instead of returning the ProduceResponse, + first pass it through this function, defaults to None. + + Returns: + list of ProduceResponses, or callback results if supplied, in the + order of input payloads + """ + + encoder = functools.partial( + KafkaProtocol.encode_produce_request, + acks=acks, + timeout=timeout) + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + + resps = self._send_broker_aware_request(payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if resp is not None and + (not fail_on_error or not self._raise_on_response_error(resp))] + + def send_fetch_request(self, payloads=[], fail_on_error=True, + callback=None, max_wait_time=100, min_bytes=4096): + """ + Encode and send a FetchRequest + + Payloads are grouped by topic and partition so they can be pipelined + to the same brokers. + """ + + encoder = functools.partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request( + payloads, encoder, + KafkaProtocol.decode_fetch_response) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_request(self, payloads=[], fail_on_error=True, + callback=None): + resps = self._send_broker_aware_request( + payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_commit_request(self, group, payloads=[], + fail_on_error=True, callback=None): + encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, + group=group) + decoder = KafkaProtocol.decode_offset_commit_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_fetch_request(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, + group=group) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_fetch_request_kafka(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, + group=group, from_kafka=True) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] diff --git a/monasca_common/kafka_lib/codec.py b/monasca_common/kafka_lib/codec.py new file mode 100644 index 00000000..a9373c72 --- /dev/null +++ b/monasca_common/kafka_lib/codec.py @@ -0,0 +1,155 @@ +import gzip +from io import BytesIO +import struct + +from six.moves import xrange + +_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) +_XERIAL_V1_FORMAT = 'bccccccBii' + +try: + import snappy + _HAS_SNAPPY = True +except ImportError: + _HAS_SNAPPY = False + + +def has_gzip(): + return True + + +def has_snappy(): + return _HAS_SNAPPY + + +def gzip_encode(payload, compresslevel=None): + if not compresslevel: + compresslevel = 9 + + with BytesIO() as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel) + try: + gzipper.write(payload) + finally: + gzipper.close() + + result = buf.getvalue() + + return result + + +def gzip_decode(payload): + with BytesIO(payload) as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode='r') + try: + result = gzipper.read() + finally: + gzipper.close() + + return result + + +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): + """Encodes the given data with snappy if xerial_compatible is set then the + stream is encoded in a fashion compatible with the xerial snappy library + + The block size (xerial_blocksize) controls how frequent the blocking + occurs 32k is the default in the xerial library. + + The format winds up being + +-------------+------------+--------------+------------+--------------+ + | Header | Block1 len | Block1 data | Blockn len | Blockn data | + |-------------+------------+--------------+------------+--------------| + | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | + +-------------+------------+--------------+------------+--------------+ + + It is important to not that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the + number of bytes that will be present in the stream, that is the + length will always be <= blocksize. + """ + + if not has_snappy(): + raise NotImplementedError("Snappy codec is not available") + + if xerial_compatible: + def _chunker(): + for i in xrange(0, len(payload), xerial_blocksize): + yield payload[i:i+xerial_blocksize] + + out = BytesIO() + + header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + + out.write(header) + for chunk in _chunker(): + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) + + out.seek(0) + return out.read() + + else: + return snappy.compress(payload) + + +def _detect_xerial_stream(payload): + """Detects if the data given might have been encoded with the blocking mode + of the xerial snappy library. + + This mode writes a magic header of the format: + +--------+--------------+------------+---------+--------+ + | Marker | Magic String | Null / Pad | Version | Compat | + |--------+--------------+------------+---------+--------| + | byte | c-string | byte | int32 | int32 | + |--------+--------------+------------+---------+--------| + | -126 | 'SNAPPY' | \0 | | | + +--------+--------------+------------+---------+--------+ + + The pad appears to be to ensure that SNAPPY is a valid cstring + The version is the version of this format as written by xerial, + in the wild this is currently 1 as such we only support v1. + + Compat is there to claim the miniumum supported version that + can read a xerial block stream, presently in the wild this is + 1. + """ + + if len(payload) > 16: + header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + return header == _XERIAL_V1_HEADER + return False + + +def snappy_decode(payload): + if not has_snappy(): + raise NotImplementedError("Snappy codec is not available") + + if _detect_xerial_stream(payload): + # TODO ? Should become a fileobj ? + out = BytesIO() + byt = payload[16:] + length = len(byt) + cursor = 0 + + while cursor < length: + block_size = struct.unpack_from('!i', byt[cursor:])[0] + # Skip the block size + cursor += 4 + end = cursor + block_size + out.write(snappy.decompress(byt[cursor:end])) + cursor = end + + out.seek(0) + return out.read() + else: + return snappy.decompress(payload) diff --git a/monasca_common/kafka_lib/common.py b/monasca_common/kafka_lib/common.py new file mode 100644 index 00000000..a7d81644 --- /dev/null +++ b/monasca_common/kafka_lib/common.py @@ -0,0 +1,270 @@ +import inspect +import sys +from collections import namedtuple + +############### +# Structs # +############### + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest +ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", + ["groups"]) + +ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", + ["error", "nodeId", "host", "port"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI +ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages"]) + +ProduceResponse = namedtuple("ProduceResponse", + ["topic", "partition", "error", "offset"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI +FetchRequest = namedtuple("FetchRequest", + ["topic", "partition", "offset", "max_bytes"]) + +FetchResponse = namedtuple("FetchResponse", + ["topic", "partition", "error", "highwaterMark", "messages"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI +OffsetRequest = namedtuple("OffsetRequest", + ["topic", "partition", "time", "max_offsets"]) + +OffsetResponse = namedtuple("OffsetResponse", + ["topic", "partition", "error", "offsets"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI +OffsetCommitRequest = namedtuple("OffsetCommitRequest", + ["topic", "partition", "offset", "metadata"]) + +OffsetCommitResponse = namedtuple("OffsetCommitResponse", + ["topic", "partition", "error"]) + +OffsetFetchRequest = namedtuple("OffsetFetchRequest", + ["topic", "partition"]) + +OffsetFetchResponse = namedtuple("OffsetFetchResponse", + ["topic", "partition", "offset", "metadata", "error"]) + + + +# Other useful structs +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) + +KafkaMessage = namedtuple("KafkaMessage", + ["topic", "partition", "offset", "key", "value"]) + +# Define retry policy for async producer +# Limit value: int >= 0, 0 means no retries +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + + +################# +# Exceptions # +################# + + +class KafkaError(RuntimeError): + pass + + +class BrokerResponseError(KafkaError): + pass + + +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + + +class OffsetMetadataTooLargeError(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + + +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = 13 + message = 'STALE_LEADER_EPOCH_CODE' + + +class OffsetsLoadInProgressCode(BrokerResponseError): + errno = 14 + message = 'OFFSETS_LOAD_IN_PROGRESS_CODE' + + +class ConsumerCoordinatorNotAvailableCode(BrokerResponseError): + errno = 15 + message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE' + + +class NotCoordinatorForConsumerCode(BrokerResponseError): + errno = 16 + message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE' + + +class KafkaUnavailableError(KafkaError): + pass + + +class KafkaTimeoutError(KafkaError): + pass + + +class FailedPayloadsError(KafkaError): + def __init__(self, payload, *args): + super(FailedPayloadsError, self).__init__(*args) + self.payload = payload + + +class ConnectionError(KafkaError): + pass + + +class BufferUnderflowError(KafkaError): + pass + + +class ChecksumError(KafkaError): + pass + + +class ConsumerFetchSizeTooSmall(KafkaError): + pass + + +class ConsumerNoMoreData(KafkaError): + pass + + +class ConsumerTimeout(KafkaError): + pass + + +class ProtocolError(KafkaError): + pass + + +class UnsupportedCodecError(KafkaError): + pass + + +class KafkaConfigurationError(KafkaError): + pass + + +class AsyncProducerQueueFull(KafkaError): + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs + + +def _iter_broker_errors(): + for name, obj in inspect.getmembers(sys.modules[__name__]): + if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: + yield obj + + +kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) + + +def check_error(response): + if isinstance(response, Exception): + raise response + if response.error: + error_class = kafka_errors.get(response.error, UnknownError) + raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES diff --git a/monasca_common/kafka_lib/conn.py b/monasca_common/kafka_lib/conn.py new file mode 100644 index 00000000..b520b0fa --- /dev/null +++ b/monasca_common/kafka_lib/conn.py @@ -0,0 +1,223 @@ +import copy +import logging +from random import shuffle +import socket +import struct +from threading import local + +import six + +from monasca_common.kafka_lib.common import ConnectionError + + +log = logging.getLogger(__name__) + +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, six.string_types): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + +class KafkaConnection(local): + """ + A socket connection to a single Kafka broker + + This class is _not_ thread safe. Each call to `send` must be followed + by a call to `recv` in order to get the correct response. Eventually, + we can do something in here to facilitate multiplexed requests/responses + since the Kafka API includes a correlation id. + + Arguments: + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. + """ + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(KafkaConnection, self).__init__() + self.host = host + self.port = port + self.timeout = timeout + self._sock = None + + self.reinit() + + def __getnewargs__(self): + return (self.host, self.port, self.timeout) + + def __repr__(self): + return "" % (self.host, self.port) + + ################### + # Private API # + ################### + + def _raise_connection_error(self): + # Cleanup socket if we have one + if self._sock: + self.close() + + # And then raise + raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + responses = [] + + log.debug("About to read %d bytes from Kafka", num_bytes) + + # Make sure we have a connection + if not self._sock: + self.reinit() + + while bytes_left: + + try: + data = self._sock.recv(min(bytes_left, 4096)) + + # Receiving empty string from recv signals + # that the socket is in error. we will never get + # more data from this socket + if data == b'': + raise socket.error("Not enough data to read message -- did server kill socket?") + + except socket.error: + log.exception('Unable to receive data from Kafka') + self._raise_connection_error() + + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + responses.append(data) + + return b''.join(responses) + + ################## + # Public API # + ################## + + # TODO multiplex socket communication to allow for multi-threaded clients + + def get_connected_socket(self): + if not self._sock: + self.reinit() + return self._sock + + def send(self, request_id, payload): + """ + Send a request to Kafka + + Arguments:: + request_id (int): can be any int (used only for debug logging...) + payload: an encoded kafka packet (see KafkaProtocol) + """ + + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + + # Make sure we have a connection + if not self._sock: + self.reinit() + + try: + self._sock.sendall(payload) + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() + + def recv(self, request_id): + """ + Get a response packet from Kafka + + Arguments: + request_id: can be any int (only used for debug logging...) + + Returns: + str: Encoded kafka packet response from server + """ + log.debug("Reading response %d from Kafka" % request_id) + + # Make sure we have a connection + if not self._sock: + self.reinit() + + # Read the size off of the header + resp = self._read_bytes(4) + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return resp + + def copy(self): + """ + Create an inactive copy of the connection object, suitable for + passing to a background thread. + + The returned copy is not connected; you must call reinit() before + using. + """ + c = copy.deepcopy(self) + # Python 3 doesn't copy custom attributes of the threadlocal subclass + c.host = copy.copy(self.host) + c.port = copy.copy(self.port) + c.timeout = copy.copy(self.timeout) + c._sock = None + return c + + def close(self): + """ + Shutdown and close the connection socket + """ + log.debug("Closing socket connection for %s:%d" % (self.host, self.port)) + if self._sock: + # Call shutdown to be a good TCP client + # But expect an error if the socket has already been + # closed by the server + try: + self._sock.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + + # Closing the socket should always succeed + self._sock.close() + self._sock = None + else: + log.debug("No socket found to close!") + + def reinit(self): + """ + Re-initialize the socket connection + close current socket (if open) + and start a fresh connection + raise ConnectionError on error + """ + log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) + + if self._sock: + self.close() + + try: + self._sock = socket.create_connection((self.host, self.port), self.timeout) + except socket.error: + log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) + self._raise_connection_error() diff --git a/monasca_common/kafka_lib/consumer/__init__.py b/monasca_common/kafka_lib/consumer/__init__.py new file mode 100644 index 00000000..935f56e1 --- /dev/null +++ b/monasca_common/kafka_lib/consumer/__init__.py @@ -0,0 +1,7 @@ +from .simple import SimpleConsumer +from .multiprocess import MultiProcessConsumer +from .kafka import KafkaConsumer + +__all__ = [ + 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer' +] diff --git a/monasca_common/kafka_lib/consumer/base.py b/monasca_common/kafka_lib/consumer/base.py new file mode 100644 index 00000000..d14a0f8d --- /dev/null +++ b/monasca_common/kafka_lib/consumer/base.py @@ -0,0 +1,229 @@ +from __future__ import absolute_import + +import atexit +import logging +import numbers +from threading import Lock + +import monasca_common.kafka_lib.common as kafka_common +from monasca_common.kafka_lib.common import ( + OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + UnknownTopicOrPartitionError, check_error, KafkaError +) + +from monasca_common.kafka_lib.util import kafka_bytestring, ReentrantTimer + + +log = logging.getLogger('kafka.consumer') + +AUTO_COMMIT_MSG_COUNT = 100 +AUTO_COMMIT_INTERVAL = 5000 + +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 +FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 + +MAX_BACKOFF_SECONDS = 60 + +class Consumer(object): + """ + Base class to be used by other consumers. Not to be used directly + + This base class provides logic for + + * initialization and fetching metadata of partitions + * Auto-commit logic + * APIs for fetching pending message count + + """ + def __init__(self, client, group, topic, partitions=None, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): + + self.client = client + self.topic = kafka_bytestring(topic) + self.group = None if group is None else kafka_bytestring(group) + self.client.load_metadata_for_topics(topic) + self.offsets = {} + + if partitions is None: + partitions = self.client.get_partition_ids_for_topic(topic) + else: + assert all(isinstance(x, numbers.Integral) for x in partitions) + + # Variables for handling offset commits + self.commit_lock = Lock() + self.commit_timer = None + self.count_since_commit = 0 + self.auto_commit = auto_commit + self.auto_commit_every_n = auto_commit_every_n + self.auto_commit_every_t = auto_commit_every_t + + # Set up the auto-commit timer + if auto_commit is True and auto_commit_every_t is not None: + self.commit_timer = ReentrantTimer(auto_commit_every_t, + self.commit) + self.commit_timer.start() + + # Set initial offsets + if self.group is not None: + self.fetch_last_known_offsets(partitions) + else: + for partition in partitions: + self.offsets[partition] = 0 + + # Register a cleanup handler + def cleanup(obj): + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) + + self.partition_info = False # Do not return partition info in msgs + + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + + def fetch_last_known_offsets(self, partitions=None): + if self.group is None: + raise ValueError('KafkaClient.group must not be None') + + if partitions is None: + partitions = self.client.get_partition_ids_for_topic(self.topic) + + responses = self.client.send_offset_fetch_request( + self.group, + [OffsetFetchRequest(self.topic, p) for p in partitions], + fail_on_error=False + ) + + for resp in responses: + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self.offsets[resp.partition] = 0 + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self.offsets[resp.partition] = resp.offset + + def commit(self, partitions=None): + """Commit stored offsets to Kafka via OffsetCommitRequest (v0) + + Keyword Arguments: + partitions (list): list of partitions to commit, default is to commit + all of them + + Returns: True on success, False on failure + """ + + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state + if self.count_since_commit == 0: + return + + with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + + reqs = [] + if partitions is None: # commit all partitions + partitions = list(self.offsets.keys()) + + log.debug('Committing new offsets for %s, partitions %s', + self.topic, partitions) + for partition in partitions: + offset = self.offsets[partition] + log.debug('Commit offset %d in SimpleConsumer: ' + 'group=%s, topic=%s, partition=%s', + offset, self.group, self.topic, partition) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + try: + self.client.send_offset_commit_request(self.group, reqs) + except KafkaError as e: + log.error('%s saving offsets: %s', e.__class__.__name__, e) + return False + else: + self.count_since_commit = 0 + return True + + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit >= self.auto_commit_every_n: + self.commit() + + def stop(self): + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer + # exists is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + + def pending(self, partitions=None): + """ + Gets the pending message count + + Keyword Arguments: + partitions (list): list of partitions to check for, default is to check all + """ + if partitions is None: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset + + return total diff --git a/monasca_common/kafka_lib/consumer/kafka.py b/monasca_common/kafka_lib/consumer/kafka.py new file mode 100644 index 00000000..e32e7abc --- /dev/null +++ b/monasca_common/kafka_lib/consumer/kafka.py @@ -0,0 +1,772 @@ +from __future__ import absolute_import + +from collections import namedtuple +from copy import deepcopy +import logging +import random +import sys +import time + +import six + +from monasca_common.kafka_lib.client import KafkaClient +from monasca_common.kafka_lib.common import ( + OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, + OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError +) +from monasca_common.kafka_lib.util import kafka_bytestring + +logger = logging.getLogger(__name__) + +OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"]) + +DEFAULT_CONSUMER_CONFIG = { + 'client_id': __name__, + 'group_id': None, + 'bootstrap_servers': [], + 'socket_timeout_ms': 30 * 1000, + 'fetch_message_max_bytes': 1024 * 1024, + 'auto_offset_reset': 'largest', + 'fetch_min_bytes': 1, + 'fetch_wait_max_ms': 100, + 'refresh_leader_backoff_ms': 200, + 'deserializer_class': lambda msg: msg, + 'auto_commit_enable': False, + 'auto_commit_interval_ms': 60 * 1000, + 'auto_commit_interval_messages': None, + 'consumer_timeout_ms': -1, + + # Currently unused + 'socket_receive_buffer_bytes': 64 * 1024, + 'num_consumer_fetchers': 1, + 'default_fetcher_backoff_ms': 1000, + 'queued_max_message_chunks': 10, + 'rebalance_max_retries': 4, + 'rebalance_backoff_ms': 2000, +} + +DEPRECATED_CONFIG_KEYS = { + 'metadata_broker_list': 'bootstrap_servers', +} + +class KafkaConsumer(object): + """A simpler kafka consumer""" + DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG) + + def __init__(self, *topics, **configs): + self.configure(**configs) + self.set_topic_partitions(*topics) + + def configure(self, **configs): + """Configure the consumer instance + + Configuration settings can be passed to constructor, + otherwise defaults will be used: + + Keyword Arguments: + bootstrap_servers (list): List of initial broker nodes the consumer + should contact to bootstrap initial cluster metadata. This does + not have to be the full node list. It just needs to have at + least one broker that will respond to a Metadata API Request. + client_id (str): a unique name for this client. Defaults to + 'kafka.consumer.kafka'. + group_id (str): the name of the consumer group to join, + Offsets are fetched / committed to this group name. + fetch_message_max_bytes (int, optional): Maximum bytes for each + topic/partition fetch request. Defaults to 1024*1024. + fetch_min_bytes (int, optional): Minimum amount of data the server + should return for a fetch request, otherwise wait up to + fetch_wait_max_ms for more data to accumulate. Defaults to 1. + fetch_wait_max_ms (int, optional): Maximum time for the server to + block waiting for fetch_min_bytes messages to accumulate. + Defaults to 100. + refresh_leader_backoff_ms (int, optional): Milliseconds to backoff + when refreshing metadata on errors (subject to random jitter). + Defaults to 200. + socket_timeout_ms (int, optional): TCP socket timeout in + milliseconds. Defaults to 30*1000. + auto_offset_reset (str, optional): A policy for resetting offsets on + OffsetOutOfRange errors. 'smallest' will move to the oldest + available message, 'largest' will move to the most recent. Any + ofther value will raise the exception. Defaults to 'largest'. + deserializer_class (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. Defaults to + lambda msg: msg. + auto_commit_enable (bool, optional): Enabling auto-commit will cause + the KafkaConsumer to periodically commit offsets without an + explicit call to commit(). Defaults to False. + auto_commit_interval_ms (int, optional): If auto_commit_enabled, + the milliseconds between automatic offset commits. Defaults to + 60 * 1000. + auto_commit_interval_messages (int, optional): If + auto_commit_enabled, a number of messages consumed between + automatic offset commits. Defaults to None (disabled). + consumer_timeout_ms (int, optional): number of millisecond to throw + a timeout exception to the consumer if no message is available + for consumption. Defaults to -1 (dont throw exception). + + Configuration parameters are described in more detail at + http://kafka.apache.org/documentation.html#highlevelconsumerapi + """ + configs = self._deprecate_configs(**configs) + self._config = {} + for key in self.DEFAULT_CONFIG: + self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key]) + + if configs: + raise KafkaConfigurationError('Unknown configuration key(s): ' + + str(list(configs.keys()))) + + if self._config['auto_commit_enable']: + if not self._config['group_id']: + raise KafkaConfigurationError( + 'KafkaConsumer configured to auto-commit ' + 'without required consumer group (group_id)' + ) + + # Check auto-commit configuration + if self._config['auto_commit_enable']: + logger.info("Configuring consumer to auto-commit offsets") + self._reset_auto_commit() + + if not self._config['bootstrap_servers']: + raise KafkaConfigurationError( + 'bootstrap_servers required to configure KafkaConsumer' + ) + + self._client = KafkaClient( + self._config['bootstrap_servers'], + client_id=self._config['client_id'], + timeout=(self._config['socket_timeout_ms'] / 1000.0) + ) + + def set_topic_partitions(self, *topics): + """ + Set the topic/partitions to consume + Optionally specify offsets to start from + + Accepts types: + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } + + Optionally, offsets can be specified directly: + + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() + + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + + # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) + + """ + self._topics = [] + self._client.load_metadata_for_topics() + + # Setup offsets + self._offsets = OffsetsStruct(fetch=dict(), + commit=dict(), + highwater=dict(), + task_done=dict()) + + # Handle different topic types + for arg in topics: + + # Topic name str -- all partitions + if isinstance(arg, (six.string_types, six.binary_type)): + topic = kafka_bytestring(arg) + + for partition in self._client.get_partition_ids_for_topic(topic): + self._consume_topic_partition(topic, partition) + + # (topic, partition [, offset]) tuple + elif isinstance(arg, tuple): + topic = kafka_bytestring(arg[0]) + partition = arg[1] + self._consume_topic_partition(topic, partition) + if len(arg) == 3: + offset = arg[2] + self._offsets.fetch[(topic, partition)] = offset + + # { topic: partitions, ... } dict + elif isinstance(arg, dict): + for key, value in six.iteritems(arg): + + # key can be string (a topic) + if isinstance(key, (six.string_types, six.binary_type)): + topic = kafka_bytestring(key) + + # topic: partition + if isinstance(value, int): + self._consume_topic_partition(topic, value) + + # topic: [ partition1, partition2, ... ] + elif isinstance(value, (list, tuple)): + for partition in value: + self._consume_topic_partition(topic, partition) + else: + raise KafkaConfigurationError( + 'Unknown topic type ' + '(dict key must be int or list/tuple of ints)' + ) + + # (topic, partition): offset + elif isinstance(key, tuple): + topic = kafka_bytestring(key[0]) + partition = key[1] + self._consume_topic_partition(topic, partition) + self._offsets.fetch[(topic, partition)] = value + + else: + raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) + + # If we have a consumer group, try to fetch stored offsets + if self._config['group_id']: + self._get_commit_offsets() + + # Update missing fetch/commit offsets + for topic_partition in self._topics: + + # Commit offsets default is None + if topic_partition not in self._offsets.commit: + self._offsets.commit[topic_partition] = None + + # Skip if we already have a fetch offset from user args + if topic_partition not in self._offsets.fetch: + + # Fetch offsets default is (1) commit + if self._offsets.commit[topic_partition] is not None: + self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition] + + # or (2) auto reset + else: + self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + + # highwater marks (received from server on fetch response) + # and task_done (set locally by user) + # should always get initialized to None + self._reset_highwater_offsets() + self._reset_task_done_offsets() + + # Reset message iterator in case we were in the middle of one + self._reset_message_iterator() + + def close(self): + """Close this consumer's underlying client.""" + self._client.close() + + def next(self): + """Return the next available message + + Blocks indefinitely unless consumer_timeout_ms > 0 + + Returns: + a single KafkaMessage from the message iterator + + Raises: + ConsumerTimeout after consumer_timeout_ms and no message + + Note: + This is also the method called internally during iteration + + """ + self._set_consumer_timeout_start() + while True: + + try: + return six.next(self._get_message_iterator()) + + # Handle batch completion + except StopIteration: + self._reset_message_iterator() + + self._check_consumer_timeout() + + def fetch_messages(self): + """Sends FetchRequests for all topic/partitions set for consumption + + Returns: + Generator that yields KafkaMessage structs + after deserializing with the configured `deserializer_class` + + Note: + Refreshes metadata on errors, and resets fetch offset on + OffsetOutOfRange, per the configured `auto_offset_reset` policy + + See Also: + Key KafkaConsumer configuration parameters: + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` + + """ + + max_bytes = self._config['fetch_message_max_bytes'] + max_wait_time = self._config['fetch_wait_max_ms'] + min_bytes = self._config['fetch_min_bytes'] + + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: + raise KafkaConfigurationError( + 'No fetch offsets found when calling fetch_messages' + ) + + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] + + # send_fetch_request will batch topic/partition requests by leader + responses = self._client.send_fetch_request( + fetches, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False + ) + + for resp in responses: + + if isinstance(resp, FailedPayloadsError): + logger.warning('FailedPayloadsError attempting to fetch data') + self._refresh_metadata_on_error() + continue + + topic = kafka_bytestring(resp.topic) + partition = resp.partition + try: + check_error(resp) + except OffsetOutOfRangeError: + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' + 'offset %d (Highwatermark: %d)', + topic, partition, + self._offsets.fetch[(topic, partition)], + resp.highwaterMark) + # Reset offset + self._offsets.fetch[(topic, partition)] = ( + self._reset_partition_offset((topic, partition)) + ) + continue + + except NotLeaderForPartitionError: + logger.warning("NotLeaderForPartitionError for %s - %d. " + "Metadata may be out of date", + topic, partition) + self._refresh_metadata_on_error() + continue + + except RequestTimedOutError: + logger.warning("RequestTimedOutError for %s - %d", + topic, partition) + continue + + # Track server highwater mark + self._offsets.highwater[(topic, partition)] = resp.highwaterMark + + # Yield each message + # Kafka-python could raise an exception during iteration + # we are not catching -- user will need to address + for (offset, message) in resp.messages: + # deserializer_class could raise an exception here + val = self._config['deserializer_class'](message.value) + msg = KafkaMessage(topic, partition, offset, message.key, val) + + # in some cases the server will return earlier messages + # than we requested. skip them per kafka spec + if offset < self._offsets.fetch[(topic, partition)]: + logger.debug('message offset less than fetched offset ' + 'skipping: %s', msg) + continue + # Only increment fetch offset + # if we safely got the message and deserialized + self._offsets.fetch[(topic, partition)] = offset + 1 + + # Then yield to user + yield msg + + def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): + """Request available fetch offsets for a single topic/partition + + Keyword Arguments: + topic (str): topic for offset request + partition (int): partition for offset request + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. + Specify -1 to receive the latest offset (i.e. the offset of the + next coming message) and -2 to receive the earliest available + offset. Note that because offsets are pulled in descending + order, asking for the earliest offset will always return you a + single element. + max_num_offsets (int): Maximum offsets to include in the OffsetResponse + + Returns: + a list of offsets in the OffsetResponse submitted for the provided + topic / partition. See: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI + """ + reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + + (resp,) = self._client.send_offset_request(reqs) + + check_error(resp) + + # Just for sanity.. + # probably unnecessary + assert resp.topic == topic + assert resp.partition == partition + + return resp.offsets + + def offsets(self, group=None): + """Get internal consumer offset values + + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct + """ + if not group: + return { + 'fetch': self.offsets('fetch'), + 'commit': self.offsets('commit'), + 'task_done': self.offsets('task_done'), + 'highwater': self.offsets('highwater') + } + else: + return dict(deepcopy(getattr(self._offsets, group))) + + def task_done(self, message): + """Mark a fetched message as consumed. + + Offsets for messages marked as "task_done" will be stored back + to the kafka cluster for this consumer group on commit() + + Arguments: + message (KafkaMessage): the message to mark as complete + + Returns: + True, unless the topic-partition for this message has not + been configured for the consumer. In normal operation, this + should not happen. But see github issue 364. + """ + topic_partition = (message.topic, message.partition) + if topic_partition not in self._topics: + logger.warning('Unrecognized topic/partition in task_done message: ' + '{0}:{1}'.format(*topic_partition)) + return False + + offset = message.offset + + # Warn on non-contiguous offsets + prev_done = self._offsets.task_done[topic_partition] + if prev_done is not None and offset != (prev_done + 1): + logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', + offset, prev_done) + + # Warn on smaller offsets than previous commit + # "commit" offsets are actually the offset of the next message to fetch. + prev_commit = self._offsets.commit[topic_partition] + if prev_commit is not None and ((offset + 1) <= prev_commit): + logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d', + offset, prev_commit) + + self._offsets.task_done[topic_partition] = offset + + # Check for auto-commit + if self._does_auto_commit_messages(): + self._incr_auto_commit_message_count() + + if self._should_auto_commit(): + self.commit() + + return True + + def commit(self): + """Store consumed message offsets (marked via task_done()) + to kafka cluster for this consumer_group. + + Returns: + True on success, or False if no offsets were found for commit + + Note: + this functionality requires server version >=0.8.1.1 + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + """ + if not self._config['group_id']: + logger.warning('Cannot commit without a group_id!') + raise KafkaConfigurationError( + 'Attempted to commit offsets ' + 'without a configured consumer group (group_id)' + ) + + # API supports storing metadata with each commit + # but for now it is unused + metadata = b'' + + offsets = self._offsets.task_done + commits = [] + for topic_partition, task_done_offset in six.iteritems(offsets): + + # Skip if None + if task_done_offset is None: + continue + + # Commit offsets as the next offset to fetch + # which is consistent with the Java Client + # task_done is marked by messages consumed, + # so add one to mark the next message for fetching + commit_offset = (task_done_offset + 1) + + # Skip if no change from previous committed + if commit_offset == self._offsets.commit[topic_partition]: + continue + + commits.append( + OffsetCommitRequest(topic_partition[0], topic_partition[1], + commit_offset, metadata) + ) + + if commits: + logger.info('committing consumer offsets to group %s', self._config['group_id']) + resps = self._client.send_offset_commit_request( + kafka_bytestring(self._config['group_id']), commits, + fail_on_error=False + ) + + for r in resps: + check_error(r) + topic_partition = (r.topic, r.partition) + task_done = self._offsets.task_done[topic_partition] + self._offsets.commit[topic_partition] = (task_done + 1) + + if self._config['auto_commit_enable']: + self._reset_auto_commit() + + return True + + else: + logger.info('No new offsets found to commit in group %s', self._config['group_id']) + return False + + # + # Topic/partition management private methods + # + + def _consume_topic_partition(self, topic, partition): + topic = kafka_bytestring(topic) + if not isinstance(partition, int): + raise KafkaConfigurationError('Unknown partition type (%s) ' + '-- expected int' % type(partition)) + + if topic not in self._client.topic_partitions: + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + if partition not in self._client.get_partition_ids_for_topic(topic): + raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " + "in broker metadata" % (partition, topic)) + logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) + self._topics.append((topic, partition)) + + def _refresh_metadata_on_error(self): + refresh_ms = self._config['refresh_leader_backoff_ms'] + jitter_pct = 0.20 + sleep_ms = random.randint( + int((1.0 - 0.5 * jitter_pct) * refresh_ms), + int((1.0 + 0.5 * jitter_pct) * refresh_ms) + ) + while True: + logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) + time.sleep(sleep_ms / 1000.0) + try: + self._client.load_metadata_for_topics() + except KafkaUnavailableError: + logger.warning("Unable to refresh topic metadata... cluster unavailable") + self._check_consumer_timeout() + else: + logger.info("Topic metadata refreshed") + return + + # + # Offset-managment private methods + # + + def _get_commit_offsets(self): + logger.info("Consumer fetching stored offsets") + for topic_partition in self._topics: + (resp,) = self._client.send_offset_fetch_request( + kafka_bytestring(self._config['group_id']), + [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + fail_on_error=False) + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self._offsets.commit[topic_partition] = None + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self._offsets.commit[topic_partition] = resp.offset + + def _reset_highwater_offsets(self): + for topic_partition in self._topics: + self._offsets.highwater[topic_partition] = None + + def _reset_task_done_offsets(self): + for topic_partition in self._topics: + self._offsets.task_done[topic_partition] = None + + def _reset_partition_offset(self, topic_partition): + (topic, partition) = topic_partition + LATEST = -1 + EARLIEST = -2 + + request_time_ms = None + if self._config['auto_offset_reset'] == 'largest': + request_time_ms = LATEST + elif self._config['auto_offset_reset'] == 'smallest': + request_time_ms = EARLIEST + else: + + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise # pylint: disable-msg=E0704 + + (offset, ) = self.get_partition_offsets(topic, partition, + request_time_ms, max_num_offsets=1) + return offset + + # + # Consumer Timeout private methods + # + + def _set_consumer_timeout_start(self): + self._consumer_timeout = False + if self._config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0) + + def _check_consumer_timeout(self): + if self._consumer_timeout and time.time() > self._consumer_timeout: + raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) + + # + # Autocommit private methods + # + + def _should_auto_commit(self): + if self._does_auto_commit_ms(): + if time.time() >= self._next_commit_time: + return True + + if self._does_auto_commit_messages(): + if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: + return True + + return False + + def _reset_auto_commit(self): + self._uncommitted_message_count = 0 + self._next_commit_time = None + if self._does_auto_commit_ms(): + self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) + + def _incr_auto_commit_message_count(self, n=1): + self._uncommitted_message_count += n + + def _does_auto_commit_ms(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_ms'] + if conf is not None and conf > 0: + return True + return False + + def _does_auto_commit_messages(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_messages'] + if conf is not None and conf > 0: + return True + return False + + # + # Message iterator private methods + # + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def _get_message_iterator(self): + # Fetch a new batch if needed + if self._msg_iter is None: + self._msg_iter = self.fetch_messages() + + return self._msg_iter + + def _reset_message_iterator(self): + self._msg_iter = None + + # + # python private methods + # + + def __repr__(self): + return '<{0} topics=({1})>'.format( + self.__class__.__name__, + '|'.join(["%s-%d" % topic_partition + for topic_partition in self._topics]) + ) + + # + # other private methods + # + + def _deprecate_configs(self, **configs): + for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS): + if old in configs: + logger.warning('Deprecated Kafka Consumer configuration: %s. ' + 'Please use %s instead.', old, new) + old_value = configs.pop(old) + if new not in configs: + configs[new] = old_value + return configs diff --git a/monasca_common/kafka_lib/consumer/multiprocess.py b/monasca_common/kafka_lib/consumer/multiprocess.py new file mode 100644 index 00000000..18a50144 --- /dev/null +++ b/monasca_common/kafka_lib/consumer/multiprocess.py @@ -0,0 +1,292 @@ +from __future__ import absolute_import + +from collections import namedtuple +import logging +from multiprocessing import Process, Manager as MPManager +try: + import queue # python 3 +except ImportError: + import Queue as queue # python 2 +import time + +from ..common import KafkaError +from .base import ( + Consumer, + AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS, + MAX_BACKOFF_SECONDS, +) +from .simple import SimpleConsumer + + +log = logging.getLogger(__name__) + +Events = namedtuple("Events", ["start", "pause", "exit"]) + + +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + + NOTE: Ideally, this should have been a method inside the Consumer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + + # Initial interval for retries in seconds. + interval = 1 + while not events.exit.is_set(): + try: + # Make the child processes open separate socket connections + client.reinit() + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None, + **consumer_options) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + # Wait till the controller indicates us to start consumption + events.start.wait() + + # If we are asked to quit, do so + if events.exit.is_set(): + break + + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + message = consumer.get_message() + if message: + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except queue.Full: + if events.exit.is_set(): break + + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + events.pause.wait() + + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + + consumer.stop() + + except KafkaError as e: + # Retry with exponential backoff + log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + time.sleep(interval) + interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS + + +class MultiProcessConsumer(Consumer): + """ + A consumer implementation that consumes partitions for a topic in + parallel using multiple processes + + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + If you are connecting to a server that does not support offset + commit/fetch (any prior to 0.8.1.1), then you *must* set this to None + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, + partitions=None, + auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + num_procs=1, + partitions_per_proc=0, + **simple_consumer_options): + + # Initiate the base consumer class + super(MultiProcessConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + # Variables for managing and controlling the data flow from + # consumer child process to master + manager = MPManager() + self.queue = manager.Queue(1024) # Child consumers dump messages into this + self.events = Events( + start = manager.Event(), # Indicates the consumers to start fetch + exit = manager.Event(), # Requests the consumers to shutdown + pause = manager.Event()) # Requests the consumers to pause fetch + self.size = manager.Value('i', 0) # Indicator of number of messages to fetch + + # dict.keys() returns a view in py3 + it's not a thread-safe operation + # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 + # It's safer to copy dict as it only runs during the init. + partitions = list(self.offsets.copy().keys()) + + # By default, start one consumer process for all partitions + # The logic below ensures that + # * we do not cross the num_procs limit + # * we have an even distribution of partitions among processes + + if partitions_per_proc: + num_procs = len(partitions) / partitions_per_proc + if num_procs * partitions_per_proc < len(partitions): + num_procs += 1 + + # The final set of chunks + chunks = [partitions[proc::num_procs] for proc in range(num_procs)] + + self.procs = [] + for chunk in chunks: + options = {'partitions': list(chunk)} + if simple_consumer_options: + simple_consumer_options.pop('partitions', None) + options.update(simple_consumer_options) + + args = (client.copy(), self.group, self.topic, self.queue, + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) + proc.daemon = True + proc.start() + self.procs.append(proc) + + def __repr__(self): + return '' % \ + (self.group, self.topic, len(self.procs)) + + def stop(self): + # Set exit and start off all waiting consumers + self.events.exit.set() + self.events.pause.set() + self.events.start.set() + + for proc in self.procs: + proc.join() + proc.terminate() + + super(MultiProcessConsumer, self).stop() + + def __iter__(self): + """ + Iterator to consume the messages available on this consumer + """ + # Trigger the consumer procs to start off. + # We will iterate till there are no more messages available + self.size.value = 0 + self.events.pause.set() + + while True: + self.events.start.set() + try: + # We will block for a small while so that the consumers get + # a chance to run and put some messages in the queue + # TODO: This is a hack and will make the consumer block for + # at least one second. Need to find a better way of doing this + partition, message = self.queue.get(block=True, timeout=1) + except queue.Empty: + break + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + 1 + self.events.start.clear() + self.count_since_commit += 1 + self._auto_commit() + yield message + + self.events.start.clear() + + def get_messages(self, count=1, block=True, timeout=10): + """ + Fetch the specified number of messages + + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. + """ + messages = [] + + # Give a size hint to the consumers. Each consumer process will fetch + # a maximum of "count" messages. This will fetch more messages than + # necessary, but these will not be committed to kafka. Also, the extra + # messages can be provided in subsequent runs + self.size.value = count + self.events.pause.clear() + + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): + # Trigger consumption only if the queue is empty + # By doing this, we will ensure that consumers do not + # go into overdrive and keep consuming thousands of + # messages when the user might need only a few + if self.queue.empty(): + self.events.start.set() + + block_next_call = block is True or block > len(messages) + try: + partition, message = self.queue.get(block_next_call, + timeout) + except queue.Empty: + break + + _msg = (partition, message) if self.partition_info else message + messages.append(_msg) + new_offsets[partition] = message.offset + 1 + count -= 1 + if timeout is not None: + timeout = max_time - time.time() + + self.size.value = 0 + self.events.start.clear() + self.events.pause.set() + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + + return messages diff --git a/monasca_common/kafka_lib/consumer/simple.py b/monasca_common/kafka_lib/consumer/simple.py new file mode 100644 index 00000000..7c632464 --- /dev/null +++ b/monasca_common/kafka_lib/consumer/simple.py @@ -0,0 +1,444 @@ +from __future__ import absolute_import + +try: + from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 +except ImportError: + from itertools import izip_longest as izip_longest, repeat # python 2 +import logging +try: + import queue # python 3 +except ImportError: + import Queue as queue # python 2 +import sys +import time + +import six + +from .base import ( + Consumer, + FETCH_DEFAULT_BLOCK_TIMEOUT, + AUTO_COMMIT_MSG_COUNT, + AUTO_COMMIT_INTERVAL, + FETCH_MIN_BYTES, + FETCH_BUFFER_SIZE_BYTES, + MAX_FETCH_BUFFER_SIZE_BYTES, + FETCH_MAX_WAIT_TIME, + ITER_TIMEOUT_SECONDS, + NO_MESSAGES_WAIT_TIME_SECONDS +) +from ..common import ( + FetchRequest, KafkaError, OffsetRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, FailedPayloadsError, check_error +) + + +log = logging.getLogger(__name__) + + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch + """ + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block + + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 + + def __enter__(self): + """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + + def __exit__(self, type, value, traceback): + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes + + +class SimpleConsumer(Consumer): + """ + A simple consumer implementation that consumes all/specified partitions + for a topic + + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + If you are connecting to a server that does not support offset + commit/fetch (any prior to 0.8.1.1), then you *must* set this to None + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + + auto_commit_every_n: default 100. How many messages to consume + before a commit + + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. + + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, auto_commit=True, partitions=None, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None, + auto_offset_reset='largest'): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError('buffer_size (%d) is greater than ' + 'max_buffer_size (%d)' % + (buffer_size, max_buffer_size)) + self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = fetch_size_bytes + self.fetch_offsets = self.offsets.copy() + self.iter_timeout = iter_timeout + self.auto_offset_reset = auto_offset_reset + self.queue = queue.Queue() + + def __repr__(self): + return '' % \ + (self.group, self.topic, str(self.offsets.keys())) + + def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ + LATEST = -1 + EARLIEST = -2 + if self.auto_offset_reset == 'largest': + reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + elif self.auto_offset_reset == 'smallest': + reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise # pylint: disable-msg=E0704 + + # send_offset_request + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] + + def seek(self, offset, whence=None, partition=None): + """ + Alter the current offset in the consumer, similar to fseek + + Arguments: + offset: how much to modify the offset + whence: where to modify it from, default is None + + * None is an absolute offset + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) + + partition: modify which partition, default is None. + If partition is None, would modify all partitions. + """ + + if whence is None: # set an absolute offset + if partition is None: + for tmp_partition in self.offsets: + self.offsets[tmp_partition] = offset + else: + self.offsets[partition] = offset + elif whence == 1: # relative to current position + if partition is None: + for tmp_partition, _offset in self.offsets.items(): + self.offsets[tmp_partition] = _offset + offset + else: + self.offsets[partition] += offset + elif whence in (0, 2): # relative to beginning or end + reqs = [] + deltas = {} + if partition is None: + # divide the request offset by number of partitions, + # distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + for tmp_partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), + fillvalue=0): + deltas[tmp_partition] = delta + r + + for tmp_partition in self.offsets.keys(): + if whence == 0: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -2, + 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -1, + 1)) + else: + pass + else: + deltas[partition] = offset + if whence == 0: + reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + else: + pass + + resps = self.client.send_offset_request(reqs) + for resp in resps: + self.offsets[resp.partition] = \ + resp.offsets[0] + deltas[resp.partition] + else: + raise ValueError('Unexpected value for `whence`, %d' % whence) + + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.count_since_commit += 1 + if self.auto_commit: + self.commit() + + self.queue = queue.Queue() + + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. + """ + messages = [] + if timeout is not None: + timeout += time.time() + + new_offsets = {} + log.debug('getting %d messages', count) + while len(messages) < count: + block_time = timeout - time.time() + log.debug('calling _get_message block=%s timeout=%s', block, block_time) + block_next_call = block is True or block > len(messages) + result = self._get_message(block_next_call, block_time, + get_partition_info=True, + update_offset=False) + log.debug('got %s from _get_messages', result) + if not result: + if block_next_call and (timeout is None or time.time() <= timeout): + continue + break + + partition, message = result + _msg = (partition, message) if self.partition_info else message + messages.append(_msg) + new_offsets[partition] = message.offset + 1 + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + log.debug('got %d messages: %s', len(messages), messages) + return messages + + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): + """ + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message + """ + start_at = time.time() + while self.queue.empty(): + # We're out of messages, go grab some more. + log.debug('internal queue empty, fetching more messages') + with FetchContext(self, block, timeout): + self._fetch() + + if not block or time.time() > (start_at + timeout): + break + + try: + partition, message = self.queue.get_nowait() + + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message + except queue.Empty: + log.debug('internal queue empty after fetch - returning None') + return None + + def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout + + while True: + message = self.get_message(True, timeout) + if message: + yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + else: + # Timed out waiting for a message + break + + def _fetch(self): + # Create fetch request payloads for all the partitions + partitions = dict((p, self.buffer_size) + for p in self.fetch_offsets.keys()) + while partitions: + requests = [] + for partition, buffer_size in six.iteritems(partitions): + requests.append(FetchRequest(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) + + retry_partitions = {} + for resp in responses: + + try: + check_error(resp) + except UnknownTopicOrPartitionError: + log.error('UnknownTopicOrPartitionError for %s:%d', + resp.topic, resp.partition) + self.client.reset_topic_metadata(resp.topic) + raise + except NotLeaderForPartitionError: + log.error('NotLeaderForPartitionError for %s:%d', + resp.topic, resp.partition) + self.client.reset_topic_metadata(resp.topic) + continue + except OffsetOutOfRangeError: + log.warning('OffsetOutOfRangeError for %s:%d. ' + 'Resetting partition offset...', + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + # Retry this partition + retry_partitions[resp.partition] = partitions[resp.partition] + continue + except FailedPayloadsError as e: + log.warning('FailedPayloadsError for %s:%d', + e.payload.topic, e.payload.partition) + # Retry this partition + retry_partitions[e.payload.partition] = partitions[e.payload.partition] + continue + + partition = resp.partition + buffer_size = partitions[partition] + try: + for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 + except ConsumerFetchSizeTooSmall: + if (self.max_buffer_size is not None and + buffer_size == self.max_buffer_size): + log.error('Max fetch size %d too small', + self.max_buffer_size) + raise + if self.max_buffer_size is None: + buffer_size *= 2 + else: + buffer_size = min(buffer_size * 2, + self.max_buffer_size) + log.warning('Fetch size too small, increase to %d (2x) ' + 'and retry', buffer_size) + retry_partitions[partition] = buffer_size + except ConsumerNoMoreData as e: + log.debug('Iteration was ended by %r', e) + except StopIteration: + # Stop iterating through this partition + log.debug('Done iterating over partition %s', partition) + partitions = retry_partitions diff --git a/monasca_common/kafka_lib/context.py b/monasca_common/kafka_lib/context.py new file mode 100644 index 00000000..ba59a389 --- /dev/null +++ b/monasca_common/kafka_lib/context.py @@ -0,0 +1,175 @@ +""" +Context manager to commit/rollback consumer offsets. +""" +from logging import getLogger + +from monasca_common.kafka_lib.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError + + +class OffsetCommitContext(object): + """ + Provides commit/rollback semantics around a `SimpleConsumer`. + + Usage assumes that `auto_commit` is disabled, that messages are consumed in + batches, and that the consuming process will record its own successful + processing of each message. Both the commit and rollback operations respect + a "high-water mark" to ensure that last unsuccessfully processed message + will be retried. + + Example: + + .. code:: python + + consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer.provide_partition_info() + consumer.fetch_last_known_offsets() + + while some_condition: + with OffsetCommitContext(consumer) as context: + messages = consumer.get_messages(count, block=False) + + for partition, message in messages: + if can_process(message): + context.mark(partition, message.offset) + else: + break + + if not context: + sleep(delay) + + + These semantics allow for deferred message processing (e.g. if `can_process` + compares message time to clock time) and for repeated processing of the last + unsuccessful message (until some external error is resolved). + """ + + def __init__(self, consumer): + """ + :param consumer: an instance of `SimpleConsumer` + """ + self.consumer = consumer + self.initial_offsets = None + self.high_water_mark = None + self.logger = getLogger("kafka.context") + + def mark(self, partition, offset): + """ + Set the high-water mark in the current context. + + In order to know the current partition, it is helpful to initialize + the consumer to provide partition info via: + + .. code:: python + + consumer.provide_partition_info() + + """ + max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) + + self.logger.debug("Setting high-water mark to: %s", + {partition: max_offset}) + + self.high_water_mark[partition] = max_offset + + def __nonzero__(self): + """ + Return whether any operations were marked in the context. + """ + return bool(self.high_water_mark) + + def __enter__(self): + """ + Start a new context: + + - Record the initial offsets for rollback + - Reset the high-water mark + """ + self.initial_offsets = dict(self.consumer.offsets) + self.high_water_mark = dict() + + self.logger.debug("Starting context at: %s", self.initial_offsets) + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """ + End a context. + + - If there was no exception, commit up to the current high-water mark. + - If there was an offset of range error, attempt to find the correct + initial offset. + - If there was any other error, roll back to the initial offsets. + """ + if exc_type is None: + self.commit() + elif isinstance(exc_value, OffsetOutOfRangeError): + self.handle_out_of_range() + return True + else: + self.rollback() + + def commit(self): + """ + Commit this context's offsets: + + - If the high-water mark has moved, commit up to and position the + consumer at the high-water mark. + - Otherwise, reset to the consumer to the initial offsets. + """ + if self.high_water_mark: + self.logger.info("Committing offsets: %s", self.high_water_mark) + self.commit_partition_offsets(self.high_water_mark) + self.update_consumer_offsets(self.high_water_mark) + else: + self.update_consumer_offsets(self.initial_offsets) + + def rollback(self): + """ + Rollback this context: + + - Position the consumer at the initial offsets. + """ + self.logger.info("Rolling back context: %s", self.initial_offsets) + self.update_consumer_offsets(self.initial_offsets) + + def commit_partition_offsets(self, partition_offsets): + """ + Commit explicit partition/offset pairs. + """ + self.logger.debug("Committing partition offsets: %s", partition_offsets) + + commit_requests = [ + OffsetCommitRequest(self.consumer.topic, partition, offset, None) + for partition, offset in partition_offsets.items() + ] + commit_responses = self.consumer.client.send_offset_commit_request( + self.consumer.group, + commit_requests, + ) + for commit_response in commit_responses: + check_error(commit_response) + + def update_consumer_offsets(self, partition_offsets): + """ + Update consumer offsets to explicit positions. + """ + self.logger.debug("Updating consumer offsets to: %s", partition_offsets) + + for partition, offset in partition_offsets.items(): + self.consumer.offsets[partition] = offset + + # consumer keeps other offset states beyond its `offsets` dictionary, + # a relative seek with zero delta forces the consumer to reset to the + # current value of the `offsets` dictionary + self.consumer.seek(0, 1) + + def handle_out_of_range(self): + """ + Handle out of range condition by seeking to the beginning of valid + ranges. + + This assumes that an out of range doesn't happen by seeking past the end + of valid ranges -- which is far less likely. + """ + self.logger.info("Seeking beginning of partition on out of range error") + self.consumer.seek(0, 0) diff --git a/monasca_common/kafka_lib/partitioner/__init__.py b/monasca_common/kafka_lib/partitioner/__init__.py new file mode 100644 index 00000000..5b6ac2d4 --- /dev/null +++ b/monasca_common/kafka_lib/partitioner/__init__.py @@ -0,0 +1,7 @@ +from .roundrobin import RoundRobinPartitioner +from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner + +__all__ = [ + 'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner', + 'LegacyPartitioner' +] diff --git a/monasca_common/kafka_lib/partitioner/base.py b/monasca_common/kafka_lib/partitioner/base.py new file mode 100644 index 00000000..857f634d --- /dev/null +++ b/monasca_common/kafka_lib/partitioner/base.py @@ -0,0 +1,24 @@ + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + Arguments: + partitions: A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions=None): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + Arguments: + key: the key to use for partitioning + partitions: (optional) a list of partitions. + """ + raise NotImplementedError('partition function has to be implemented') diff --git a/monasca_common/kafka_lib/partitioner/hashed.py b/monasca_common/kafka_lib/partitioner/hashed.py new file mode 100644 index 00000000..d5d6d27c --- /dev/null +++ b/monasca_common/kafka_lib/partitioner/hashed.py @@ -0,0 +1,110 @@ +import six + +from .base import Partitioner + + +class Murmur2Partitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key. Attempts to apply the same hashing + function as mainline java client. + """ + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions + + # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 + idx = (murmur2(key) & 0x7fffffff) % len(partitions) + + return partitions[idx] + + +class LegacyPartitioner(Partitioner): + """DEPRECATED -- See Issue 374 + + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions + size = len(partitions) + idx = hash(key) % size + + return partitions[idx] + + +# Default will change to Murmur2 in 0.10 release +HashedPartitioner = LegacyPartitioner + + +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 +def murmur2(key): + """Pure-python Murmur2 implementation. + + Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 + + Args: + key: if not a bytes type, encoded using default encoding + + Returns: MurmurHash2 of key bytearray + """ + + # Convert key to bytes or bytearray + if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)): + data = key + else: + data = bytearray(str(key).encode()) + + length = len(data) + seed = 0x9747b28c + # 'm' and 'r' are mixing constants generated offline. + # They're not really 'magic', they just happen to work well. + m = 0x5bd1e995 + r = 24 + + # Initialize the hash to a random value + h = seed ^ length + length4 = length // 4 + + for i in range(length4): + i4 = i * 4 + k = ((data[i4 + 0] & 0xff) + + ((data[i4 + 1] & 0xff) << 8) + + ((data[i4 + 2] & 0xff) << 16) + + ((data[i4 + 3] & 0xff) << 24)) + k &= 0xffffffff + k *= m + k &= 0xffffffff + k ^= (k % 0x100000000) >> r # k ^= k >>> r + k &= 0xffffffff + k *= m + k &= 0xffffffff + + h *= m + h &= 0xffffffff + h ^= k + h &= 0xffffffff + + # Handle the last few bytes of the input array + extra_bytes = length % 4 + if extra_bytes >= 3: + h ^= (data[(length & ~3) + 2] & 0xff) << 16 + h &= 0xffffffff + if extra_bytes >= 2: + h ^= (data[(length & ~3) + 1] & 0xff) << 8 + h &= 0xffffffff + if extra_bytes >= 1: + h ^= (data[length & ~3] & 0xff) + h &= 0xffffffff + h *= m + h &= 0xffffffff + + h ^= (h % 0x100000000) >> 13 # h >>> 13; + h &= 0xffffffff + h *= m + h &= 0xffffffff + h ^= (h % 0x100000000) >> 15 # h >>> 15; + h &= 0xffffffff + + return h diff --git a/monasca_common/kafka_lib/partitioner/roundrobin.py b/monasca_common/kafka_lib/partitioner/roundrobin.py new file mode 100644 index 00000000..6439e532 --- /dev/null +++ b/monasca_common/kafka_lib/partitioner/roundrobin.py @@ -0,0 +1,23 @@ +from itertools import cycle + +from .base import Partitioner + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + super(RoundRobinPartitioner, self).__init__(partitions) + self.iterpart = cycle(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) + + def partition(self, key, partitions=None): + # Refresh the partition list if necessary + if partitions and self.partitions != partitions: + self._set_partitions(partitions) + + return next(self.iterpart) diff --git a/monasca_common/kafka_lib/producer/__init__.py b/monasca_common/kafka_lib/producer/__init__.py new file mode 100644 index 00000000..bc0e7c61 --- /dev/null +++ b/monasca_common/kafka_lib/producer/__init__.py @@ -0,0 +1,6 @@ +from .simple import SimpleProducer +from .keyed import KeyedProducer + +__all__ = [ + 'SimpleProducer', 'KeyedProducer' +] diff --git a/monasca_common/kafka_lib/producer/base.py b/monasca_common/kafka_lib/producer/base.py new file mode 100644 index 00000000..38394de2 --- /dev/null +++ b/monasca_common/kafka_lib/producer/base.py @@ -0,0 +1,462 @@ +from __future__ import absolute_import + +import atexit +import logging +import time + +try: + from queue import Empty, Full, Queue +except ImportError: + from Queue import Empty, Full, Queue +from collections import defaultdict + +from threading import Thread, Event + +import six + +from monasca_common.kafka_lib.common import ( + ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions, + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError, + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES +) + +from monasca_common.kafka_lib.protocol import CODEC_NONE, ALL_CODECS, create_message_set +from monasca_common.kafka_lib.util import kafka_bytestring + +log = logging.getLogger('kafka.producer') + +BATCH_SEND_DEFAULT_INTERVAL = 20 +BATCH_SEND_MSG_COUNT = 20 + +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 +ASYNC_QUEUE_PUT_TIMEOUT = 0 +# unlimited retries by default +ASYNC_RETRY_LIMIT = None +ASYNC_RETRY_BACKOFF_MS = 100 +ASYNC_RETRY_ON_TIMEOUTS = True +ASYNC_LOG_MESSAGES_ON_ERROR = True + +STOP_ASYNC_PRODUCER = -1 +ASYNC_STOP_TIMEOUT_SECS = 30 + +SYNC_FAIL_ON_ERROR_DEFAULT = True + + +def _send_upstream(queue, client, codec, batch_time, batch_size, + req_acks, ack_timeout, retry_options, stop_event, + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + stop_timeout=ASYNC_STOP_TIMEOUT_SECS, + codec_compresslevel=None): + """Private method to manage producing messages asynchronously + + Listens on the queue for a specified number of messages or until + a specified timeout and then sends messages to the brokers in grouped + requests (one per broker). + + Messages placed on the queue should be tuples that conform to this format: + ((topic, partition), message, key) + + Currently does not mark messages with task_done. Do not attempt to join()! + + Arguments: + queue (threading.Queue): the queue from which to get messages + client (KafkaClient): instance to use for communicating with brokers + codec (kafka.protocol.ALL_CODECS): compression codec to use + batch_time (int): interval in seconds to send message batches + batch_size (int): count of messages that will trigger an immediate send + req_acks: required acks to use with ProduceRequests. see server protocol + ack_timeout: timeout to wait for required acks. see server protocol + retry_options (RetryOptions): settings for retry limits, backoff etc + stop_event (threading.Event): event to monitor for shutdown signal. + when this event is 'set', the producer will stop sending messages. + log_messages_on_error (bool, optional): log stringified message-contents + on any produce error, otherwise only log a hash() of the contents, + defaults to True. + stop_timeout (int or float, optional): number of seconds to continue + retrying messages after stop_event is set, defaults to 30. + """ + request_tries = {} + + while not stop_event.is_set(): + try: + client.reinit() + except Exception as e: + log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + else: + break + + stop_at = None + while not (stop_event.is_set() and queue.empty() and not request_tries): + + # Handle stop_timeout + if stop_event.is_set(): + if not stop_at: + stop_at = stop_timeout + time.time() + if time.time() > stop_at: + log.debug('Async producer stopping due to stop_timeout') + break + + timeout = batch_time + count = batch_size + send_at = time.time() + timeout + msgset = defaultdict(list) + + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: %s, timeout: %s', count, timeout) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + topic_partition, msg, key = queue.get(timeout=timeout) + except Empty: + break + + # Check if the controller has requested us to stop + if topic_partition == STOP_ASYNC_PRODUCER: + stop_event.set() + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = send_at - time.time() + msgset[topic_partition].append((msg, key)) + + # Send collected requests upstream + for topic_partition, msg in msgset.items(): + messages = create_message_set(msg, codec, key, codec_compresslevel) + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + tuple(messages)) + request_tries[req] = 0 + + if not request_tries: + continue + + reqs_to_retry, error_cls = [], None + retry_state = { + 'do_backoff': False, + 'do_refresh': False + } + + def _handle_error(error_cls, request): + if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + reqs_to_retry.append(request) + if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): + retry_state['do_backoff'] |= True + if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): + retry_state['do_refresh'] |= True + + requests = list(request_tries.keys()) + log.debug('Sending: %s', requests) + responses = client.send_produce_request(requests, + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + + log.debug('Received: %s', responses) + for i, response in enumerate(responses): + error_cls = None + if isinstance(response, FailedPayloadsError): + error_cls = response.__class__ + orig_req = response.payload + + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + orig_req = requests[i] + + if error_cls: + _handle_error(error_cls, orig_req) + log.error('%s sending ProduceRequest (#%d of %d) ' + 'to %s:%d with msgs %s', + error_cls.__name__, (i + 1), len(requests), + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + + if not reqs_to_retry: + request_tries = {} + continue + + # doing backoff before next retry + if retry_state['do_backoff'] and retry_options.backoff_ms: + log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + + # refresh topic metadata before next retry + if retry_state['do_refresh']: + log.warn('Async producer forcing metadata refresh metadata before retrying') + try: + client.load_metadata_for_topics() + except Exception as e: + log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message) + + # Apply retry limit, dropping messages that are over + request_tries = dict( + (key, count + 1) + for (key, count) in request_tries.items() + if key in reqs_to_retry + and (retry_options.limit is None + or (count < retry_options.limit)) + ) + + # Log messages we are going to retry + for orig_req in request_tries.keys(): + log.info('Retrying ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + + if request_tries or not queue.empty(): + log.error('Stopped producer with {0} unsent messages' + .format(len(request_tries) + queue.qsize())) + + +class Producer(object): + """ + Base class to be used by producers + + Arguments: + client (KafkaClient): instance to use for broker communications. + If async=True, the background thread will use client.copy(), + which is expected to return a thread-safe object. + codec (kafka.protocol.ALL_CODECS): compression codec to use. + req_acks (int, optional): A value indicating the acknowledgements that + the server must receive before responding to the request, + defaults to 1 (local ack). + ack_timeout (int, optional): millisecond timeout to wait for the + configured req_acks, defaults to 1000. + sync_fail_on_error (bool, optional): whether sync producer should + raise exceptions (True), or just return errors (False), + defaults to True. + async (bool, optional): send message using a background thread, + defaults to False. + batch_send_every_n (int, optional): If async is True, messages are + sent in batches of this size, defaults to 20. + batch_send_every_t (int or float, optional): If async is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages + or None for unlimited, defaults to None / unlimited. + async_retry_backoff_ms (int, optional): milliseconds to backoff on + failed messages, defaults to 100. + async_retry_on_timeouts (bool, optional): whether to retry on + RequestTimeoutError, defaults to True. + async_queue_maxsize (int, optional): limit to the size of the + internal message queue in number of messages (not size), defaults + to 0 (no limit). + async_queue_put_timeout (int or float, optional): timeout seconds + for queue.put in send_messages for async producers -- will only + apply if async_queue_maxsize > 0 and the queue is Full, + defaults to 0 (fail immediately on full queue). + async_log_messages_on_error (bool, optional): set to False and the + async producer will only log hash() contents on failed produce + requests, defaults to True (log full messages). Hash logging + will not allow you to identify the specific message that failed, + but it will allow you to match failures with retries. + async_stop_timeout (int or float, optional): seconds to continue + attempting to send queued messages after producer.stop(), + defaults to 30. + + Deprecated Arguments: + batch_send (bool, optional): If True, messages are sent by a background + thread in batches, defaults to False. Deprecated, use 'async' + """ + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + DEFAULT_ACK_TIMEOUT = 1000 + + def __init__(self, client, + req_acks=ACK_AFTER_LOCAL_WRITE, + ack_timeout=DEFAULT_ACK_TIMEOUT, + codec=None, + codec_compresslevel=None, + sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, + async=False, + batch_send=False, # deprecated, use async + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + + if async: + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 + + self.client = client + self.async = async + self.req_acks = req_acks + self.ack_timeout = ack_timeout + self.stopped = False + + if codec is None: + codec = CODEC_NONE + elif codec not in ALL_CODECS: + raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) + + self.codec = codec + self.codec_compresslevel = codec_compresslevel + + if self.async: + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) + self.thread_stop_event = Event() + self.thread = Thread( + target=_send_upstream, + args=(self.queue, self.client.copy(), self.codec, + batch_send_every_t, batch_send_every_n, + self.req_acks, self.ack_timeout, + async_retry_options, self.thread_stop_event), + kwargs={'log_messages_on_error': async_log_messages_on_error, + 'stop_timeout': async_stop_timeout, + 'codec_compresslevel': self.codec_compresslevel} + ) + + # Thread will die if main thread exits + self.thread.daemon = True + self.thread.start() + + def cleanup(obj): + if not obj.stopped: + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) + else: + self.sync_fail_on_error = sync_fail_on_error + + def send_messages(self, topic, partition, *msg): + """ + Helper method to send produce requests + @param: topic, name of topic for produce request -- type str + @param: partition, partition number for produce request -- type int + @param: *msg, one or more message payloads -- type bytes + @returns: ResponseRequest returned by server + raises on error + + Note that msg type *must* be encoded to bytes by user. + Passing unicode message will not work, for example + you should encode before calling send_messages via + something like `unicode_message.encode('utf-8')` + + All messages produced via this method will set the message 'key' to Null + """ + topic = kafka_bytestring(topic) + return self._send_messages(topic, partition, *msg) + + def _send_messages(self, topic, partition, *msg, **kwargs): + key = kwargs.pop('key', None) + + # Guarantee that msg is actually a list or tuple (should always be true) + if not isinstance(msg, (list, tuple)): + raise TypeError("msg is not a list or tuple!") + + for m in msg: + # The protocol allows to have key & payload with null values both, + # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense. + if m is None: + if key is None: + raise TypeError("key and payload can't be null in one") + # Raise TypeError if any non-null message is not encoded as bytes + elif not isinstance(m, six.binary_type): + raise TypeError("all produce message payloads must be null or type bytes") + + # Raise TypeError if topic is not encoded as bytes + if not isinstance(topic, six.binary_type): + raise TypeError("the topic must be type bytes") + + # Raise TypeError if the key is not encoded as bytes + if key is not None and not isinstance(key, six.binary_type): + raise TypeError("the key must be type bytes") + + if self.async: + for idx, m in enumerate(msg): + try: + item = (TopicAndPartition(topic, partition), m, key) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) + except Full: + raise AsyncProducerQueueFull( + msg[idx:], + 'Producer async queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) + resp = [] + else: + messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) + req = ProduceRequest(topic, partition, messages) + try: + resp = self.client.send_produce_request( + [req], acks=self.req_acks, timeout=self.ack_timeout, + fail_on_error=self.sync_fail_on_error + ) + except Exception: + log.exception("Unable to send messages") + raise + return resp + + def stop(self, timeout=None): + """ + Stop the producer (async mode). Blocks until async thread completes. + """ + if timeout is not None: + log.warning('timeout argument to stop() is deprecated - ' + 'it will be removed in future release') + + if not self.async: + log.warning('producer.stop() called, but producer is not async') + return + + if self.stopped: + log.warning('producer.stop() called, but producer is already stopped') + return + + if self.async: + self.queue.put((STOP_ASYNC_PRODUCER, None, None)) + self.thread_stop_event.set() + self.thread.join() + + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer exists + # but that is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + + self.stopped = True + + def __del__(self): + if not self.stopped: + self.stop() diff --git a/monasca_common/kafka_lib/producer/keyed.py b/monasca_common/kafka_lib/producer/keyed.py new file mode 100644 index 00000000..a5a26c95 --- /dev/null +++ b/monasca_common/kafka_lib/producer/keyed.py @@ -0,0 +1,51 @@ +from __future__ import absolute_import + +import logging +import warnings + +from .base import Producer +from ..partitioner import HashedPartitioner +from ..util import kafka_bytestring + + +log = logging.getLogger(__name__) + + +class KeyedProducer(Producer): + """ + A producer which distributes messages to partitions based on the key + + See Producer class for Arguments + + Additional Arguments: + partitioner: A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner. + Defaults to HashedPartitioner. + """ + def __init__(self, *args, **kwargs): + self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner) + self.partitioners = {} + super(KeyedProducer, self).__init__(*args, **kwargs) + + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if not self.client.has_metadata_for_topic(topic): + self.client.load_metadata_for_topics(topic) + + self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + + partitioner = self.partitioners[topic] + return partitioner.partition(key) + + def send_messages(self, topic, key, *msg): + topic = kafka_bytestring(topic) + partition = self._next_partition(topic, key) + return self._send_messages(topic, partition, *msg, key=key) + + # DEPRECATED + def send(self, topic, key, msg): + warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning) + return self.send_messages(topic, key, msg) + + def __repr__(self): + return '' % self.async diff --git a/monasca_common/kafka_lib/producer/simple.py b/monasca_common/kafka_lib/producer/simple.py new file mode 100644 index 00000000..13e60d98 --- /dev/null +++ b/monasca_common/kafka_lib/producer/simple.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import + +from itertools import cycle +import logging +import random +import six + +from six.moves import xrange + +from .base import Producer + + +log = logging.getLogger(__name__) + + +class SimpleProducer(Producer): + """A simple, round-robin producer. + + See Producer class for Base Arguments + + Additional Arguments: + random_start (bool, optional): randomize the initial partition which + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition, + defaults to True. + """ + def __init__(self, *args, **kwargs): + self.partition_cycles = {} + self.random_start = kwargs.pop('random_start', True) + super(SimpleProducer, self).__init__(*args, **kwargs) + + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if not self.client.has_metadata_for_topic(topic): + self.client.load_metadata_for_topics(topic) + + self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) + + # Randomize the initial partition that is returned + if self.random_start: + num_partitions = len(self.client.get_partition_ids_for_topic(topic)) + for _ in xrange(random.randint(0, num_partitions-1)): + next(self.partition_cycles[topic]) + + return next(self.partition_cycles[topic]) + + def send_messages(self, topic, *msg): + if not isinstance(topic, six.binary_type): + topic = topic.encode('utf-8') + + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages( + topic, partition, *msg + ) + + def __repr__(self): + return '' % self.async diff --git a/monasca_common/kafka_lib/protocol.py b/monasca_common/kafka_lib/protocol.py new file mode 100644 index 00000000..51b586af --- /dev/null +++ b/monasca_common/kafka_lib/protocol.py @@ -0,0 +1,646 @@ +import logging +import struct + +import six + +from six.moves import xrange + +from monasca_common.kafka_lib.codec import ( + gzip_encode, gzip_decode, snappy_encode, snappy_decode +) +from monasca_common.kafka_lib.common import ( + Message, OffsetAndMessage, TopicAndPartition, + BrokerMetadata, TopicMetadata, PartitionMetadata, + MetadataResponse, ProduceResponse, FetchResponse, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, BufferUnderflowError, ChecksumError, + ConsumerFetchSizeTooSmall, UnsupportedCodecError, + ConsumerMetadataResponse +) +from monasca_common.kafka_lib.util import ( + crc32, read_short_string, read_int_string, relative_unpack, + write_short_string, write_int_string, group_by_topic_and_partition +) + + +log = logging.getLogger(__name__) + +ATTRIBUTE_CODEC_MASK = 0x03 +CODEC_NONE = 0x00 +CODEC_GZIP = 0x01 +CODEC_SNAPPY = 0x02 +ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY) + + +class KafkaProtocol(object): + """ + Class to encapsulate all of the protocol encoding/decoding. + This class does not have any state associated with it, it is purely + for organization. + """ + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 + + ################### + # Private API # + ################### + + @classmethod + def _encode_message_header(cls, client_id, correlation_id, request_key, + version=0): + """ + Encode the common request envelope + """ + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + version, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # ClientId size + client_id) # ClientId + + @classmethod + def _encode_message_set(cls, messages): + """ + Encode a MessageSet. Unlike other arrays in the protocol, + MessageSets are not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + """ + message_set = [] + for message in messages: + encoded_message = KafkaProtocol._encode_message(message) + message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0, + len(encoded_message), + encoded_message)) + return b''.join(message_set) + + @classmethod + def _encode_message(cls, message): + """ + Encode a single message. + + The magic number of a message is a format version number. + The only supported magic number right now is zero + + Format + ====== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ + if message.magic == 0: + msg = b''.join([ + struct.pack('>BB', message.magic, message.attributes), + write_int_string(message.key), + write_int_string(message.value) + ]) + crc = crc32(msg) + msg = struct.pack('>I%ds' % len(msg), crc, msg) + else: + raise ProtocolError("Unexpected magic number: %d" % message.magic) + return msg + + @classmethod + def _decode_message_set_iter(cls, data): + """ + Iteratively decode a MessageSet + + Reads repeated elements of (offset, message), calling decode_message + to decode a single message. Since compressed messages contain futher + MessageSets, these two methods have been decoupled so that they may + recurse easily. + """ + cur = 0 + read_message = False + while cur < len(data): + try: + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + read_message = True + yield OffsetAndMessage(offset, message) + except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? + if read_message is False: + # If we get a partial read of a message, but haven't + # yielded anything there's a problem + raise ConsumerFetchSizeTooSmall() + else: + raise StopIteration() + + @classmethod + def _decode_message(cls, data, offset): + """ + Decode a single Message + + The only caller of this method is decode_message_set_iter. + They are decoupled to support nested messages (compressed MessageSets). + The offset is actually read from decode_message_set_iter (it is part + of the MessageSet payload). + """ + ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0) + if crc != crc32(data[4:]): + raise ChecksumError("Message checksum failed") + + (key, cur) = read_int_string(data, cur) + (value, cur) = read_int_string(data, cur) + + codec = att & ATTRIBUTE_CODEC_MASK + + if codec == CODEC_NONE: + yield (offset, Message(magic, att, key, value)) + + elif codec == CODEC_GZIP: + gz = gzip_decode(value) + for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz): + yield (offset, msg) + + elif codec == CODEC_SNAPPY: + snp = snappy_decode(value) + for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp): + yield (offset, msg) + + ################## + # Public API # + ################## + + @classmethod + def encode_produce_request(cls, client_id, correlation_id, + payloads=None, acks=1, timeout=1000): + """ + Encode some ProduceRequest structs + + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + + """ + payloads = [] if payloads is None else payloads + grouped_payloads = group_by_topic_and_partition(payloads) + + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.PRODUCE_KEY)) + + message.append(struct.pack('>hii', acks, timeout, + len(grouped_payloads))) + + for topic, topic_payloads in grouped_payloads.items(): + message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic, + len(topic_payloads))) + + for partition, payload in topic_payloads.items(): + msg_set = KafkaProtocol._encode_message_set(payload.messages) + message.append(struct.pack('>ii%ds' % len(msg_set), partition, + len(msg_set), msg_set)) + + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) + + @classmethod + def decode_produce_response(cls, data): + """ + Decode bytes to a ProduceResponse + + Arguments: + data: bytes to decode + + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + + for _ in range(num_topics): + ((strlen,), cur) = relative_unpack('>h', data, cur) + topic = data[cur:cur + strlen] + cur += strlen + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for _ in range(num_partitions): + ((partition, error, offset), cur) = relative_unpack('>ihq', + data, cur) + + yield ProduceResponse(topic, partition, error, offset) + + @classmethod + def encode_fetch_request(cls, client_id, correlation_id, payloads=None, + max_wait_time=100, min_bytes=4096): + """ + Encodes some FetchRequest structs + + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response + """ + + payloads = [] if payloads is None else payloads + grouped_payloads = group_by_topic_and_partition(payloads) + + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.FETCH_KEY)) + + # -1 is the replica id + message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes, + len(grouped_payloads))) + + for topic, topic_payloads in grouped_payloads.items(): + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) + for partition, payload in topic_payloads.items(): + message.append(struct.pack('>iqi', partition, payload.offset, + payload.max_bytes)) + + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) + + @classmethod + def decode_fetch_response(cls, data): + """ + Decode bytes to a FetchResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + + for _ in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + + for j in range(num_partitions): + ((partition, error, highwater_mark_offset), cur) = \ + relative_unpack('>ihq', data, cur) + + (message_set, cur) = read_int_string(data, cur) + + yield FetchResponse( + topic, partition, error, + highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) + + @classmethod + def encode_offset_request(cls, client_id, correlation_id, payloads=None): + payloads = [] if payloads is None else payloads + grouped_payloads = group_by_topic_and_partition(payloads) + + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_KEY)) + + # -1 is the replica id + message.append(struct.pack('>ii', -1, len(grouped_payloads))) + + for topic, topic_payloads in grouped_payloads.items(): + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) + + for partition, payload in topic_payloads.items(): + message.append(struct.pack('>iqi', partition, payload.time, + payload.max_offsets)) + + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) + + @classmethod + def decode_offset_response(cls, data): + """ + Decode bytes to an OffsetResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + + for _ in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + + for _ in range(num_partitions): + ((partition, error, num_offsets,), cur) = \ + relative_unpack('>ihi', data, cur) + + offsets = [] + for k in range(num_offsets): + ((offset,), cur) = relative_unpack('>q', data, cur) + offsets.append(offset) + + yield OffsetResponse(topic, partition, error, tuple(offsets)) + + @classmethod + def encode_metadata_request(cls, client_id, correlation_id, topics=None, + payloads=None): + """ + Encode a MetadataRequest + + Arguments: + client_id: string + correlation_id: int + topics: list of strings + """ + if payloads is None: + topics = [] if topics is None else topics + else: + topics = payloads + + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.METADATA_KEY)) + + message.append(struct.pack('>i', len(topics))) + + for topic in topics: + message.append(struct.pack('>h%ds' % len(topic), len(topic), topic)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_metadata_response(cls, data): + """ + Decode bytes to a MetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) + + # Broker info + brokers = [] + for _ in range(numbrokers): + ((nodeId, ), cur) = relative_unpack('>i', data, cur) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + brokers.append(BrokerMetadata(nodeId, host, port)) + + # Topic info + ((num_topics,), cur) = relative_unpack('>i', data, cur) + topic_metadata = [] + + for _ in range(num_topics): + ((topic_error,), cur) = relative_unpack('>h', data, cur) + (topic_name, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + partition_metadata = [] + + for _ in range(num_partitions): + ((partition_error_code, partition, leader, numReplicas), cur) = \ + relative_unpack('>hiii', data, cur) + + (replicas, cur) = relative_unpack( + '>%di' % numReplicas, data, cur) + + ((num_isr,), cur) = relative_unpack('>i', data, cur) + (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) + + partition_metadata.append( + PartitionMetadata(topic_name, partition, leader, + replicas, isr, partition_error_code) + ) + + topic_metadata.append( + TopicMetadata(topic_name, topic_error, partition_metadata) + ) + + return MetadataResponse(brokers, topic_metadata) + + @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): + """ + Encode a ConsumerMetadataRequest + + Arguments: + client_id: string + correlation_id: int + payloads: string (consumer group) + """ + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY)) + message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + + return ConsumerMetadataResponse(error, nodeId, host, port) + + @classmethod + def encode_offset_commit_request(cls, client_id, correlation_id, + group, payloads): + """ + Encode some OffsetCommitRequest structs + + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest + """ + grouped_payloads = group_by_topic_and_partition(payloads) + + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_COMMIT_KEY)) + message.append(write_short_string(group)) + message.append(struct.pack('>i', len(grouped_payloads))) + + for topic, topic_payloads in grouped_payloads.items(): + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) + + for partition, payload in topic_payloads.items(): + message.append(struct.pack('>iq', partition, payload.offset)) + message.append(write_short_string(payload.metadata)) + + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) + + @classmethod + def decode_offset_commit_response(cls, data): + """ + Decode bytes to an OffsetCommitResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + + for _ in xrange(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + + for _ in xrange(num_partitions): + ((partition, error), cur) = relative_unpack('>ih', data, cur) + yield OffsetCommitResponse(topic, partition, error) + + @classmethod + def encode_offset_fetch_request(cls, client_id, correlation_id, + group, payloads, from_kafka=False): + """ + Encode some OffsetFetchRequest structs. The request is encoded using + version 0 if from_kafka is false, indicating a request for Zookeeper + offsets. It is encoded using version 1 otherwise, indicating a request + for Kafka offsets. + + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest + from_kafka: bool, default False, set True for Kafka-committed offsets + """ + grouped_payloads = group_by_topic_and_partition(payloads) + + message = [] + reqver = 1 if from_kafka else 0 + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.OFFSET_FETCH_KEY, + version=reqver)) + + message.append(write_short_string(group)) + message.append(struct.pack('>i', len(grouped_payloads))) + + for topic, topic_payloads in grouped_payloads.items(): + message.append(write_short_string(topic)) + message.append(struct.pack('>i', len(topic_payloads))) + + for partition, payload in topic_payloads.items(): + message.append(struct.pack('>i', partition)) + + msg = b''.join(message) + return struct.pack('>i%ds' % len(msg), len(msg), msg) + + @classmethod + def decode_offset_fetch_response(cls, data): + """ + Decode bytes to an OffsetFetchResponse + + Arguments: + data: bytes to decode + """ + + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + + for _ in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + + for _ in range(num_partitions): + ((partition, offset), cur) = relative_unpack('>iq', data, cur) + (metadata, cur) = read_short_string(data, cur) + ((error,), cur) = relative_unpack('>h', data, cur) + + yield OffsetFetchResponse(topic, partition, offset, + metadata, error) + + +def create_message(payload, key=None): + """ + Construct a Message + + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + + """ + return Message(0, 0, key, payload) + + +def create_gzip_message(payloads, key=None, compresslevel=None): + """ + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + gzipped = gzip_encode(message_set, compresslevel=compresslevel) + codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP + + return Message(0, 0x00 | codec, key, gzipped) + + +def create_snappy_message(payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + snapped = snappy_encode(message_set) + codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY + + return Message(0, 0x00 | codec, key, snapped) + + +def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): + """Create a message set using the given codec. + + If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, + return a list containing a single codec-encoded message. + """ + if codec == CODEC_NONE: + return [create_message(m, k) for m, k in messages] + elif codec == CODEC_GZIP: + return [create_gzip_message(messages, key, compresslevel)] + elif codec == CODEC_SNAPPY: + return [create_snappy_message(messages, key)] + else: + raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) diff --git a/monasca_common/kafka_lib/util.py b/monasca_common/kafka_lib/util.py new file mode 100644 index 00000000..058daf20 --- /dev/null +++ b/monasca_common/kafka_lib/util.py @@ -0,0 +1,159 @@ +import binascii +import collections +import struct +import sys +from threading import Thread, Event + +import six + +from monasca_common.kafka_lib.common import BufferUnderflowError + + +def crc32(data): + return binascii.crc32(data) & 0xffffffff + + +def write_int_string(s): + if s is not None and not isinstance(s, six.binary_type): + raise TypeError('Expected "%s" to be bytes\n' + 'data=%s' % (type(s), repr(s))) + if s is None: + return struct.pack('>i', -1) + else: + return struct.pack('>i%ds' % len(s), len(s), s) + + +def write_short_string(s): + if s is not None and not isinstance(s, six.binary_type): + raise TypeError('Expected "%s" to be bytes\n' + 'data=%s' % (type(s), repr(s))) + if s is None: + return struct.pack('>h', -1) + elif len(s) > 32767 and sys.version_info < (2, 7): + # Python 2.6 issues a deprecation warning instead of a struct error + raise struct.error(len(s)) + else: + return struct.pack('>h%ds' % len(s), len(s), s) + + +def read_short_string(data, cur): + if len(data) < cur + 2: + raise BufferUnderflowError("Not enough data left") + + (strlen,) = struct.unpack('>h', data[cur:cur + 2]) + if strlen == -1: + return None, cur + 2 + + cur += 2 + if len(data) < cur + strlen: + raise BufferUnderflowError("Not enough data left") + + out = data[cur:cur + strlen] + return out, cur + strlen + + +def read_int_string(data, cur): + if len(data) < cur + 4: + raise BufferUnderflowError( + "Not enough data left to read string len (%d < %d)" % + (len(data), cur + 4)) + + (strlen,) = struct.unpack('>i', data[cur:cur + 4]) + if strlen == -1: + return None, cur + 4 + + cur += 4 + if len(data) < cur + strlen: + raise BufferUnderflowError("Not enough data left") + + out = data[cur:cur + strlen] + return out, cur + strlen + + +def relative_unpack(fmt, data, cur): + size = struct.calcsize(fmt) + if len(data) < cur + size: + raise BufferUnderflowError("Not enough data left") + + out = struct.unpack(fmt, data[cur:cur + size]) + return out, cur + size + + +def group_by_topic_and_partition(tuples): + out = collections.defaultdict(dict) + for t in tuples: + assert t.topic not in out or t.partition not in out[t.topic], \ + 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, + t.topic, t.partition) + out[t.topic][t.partition] = t + return out + + +def kafka_bytestring(s): + """ + Takes a string or bytes instance + Returns bytes, encoding strings in utf-8 as necessary + """ + if isinstance(s, six.binary_type): + return s + if isinstance(s, six.string_types): + return s.encode('utf-8') + raise TypeError(s) + + +class ReentrantTimer(object): + """ + A timer that can be restarted, unlike threading.Timer + (although this uses threading.Timer) + + Arguments: + + t: timer interval in milliseconds + fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function + """ + def __init__(self, t, fn, *args, **kwargs): + + if t <= 0: + raise ValueError('Invalid timeout value') + + if not callable(fn): + raise ValueError('fn must be callable') + + self.thread = None + self.t = t / 1000.0 + self.fn = fn + self.args = args + self.kwargs = kwargs + self.active = None + + def _timer(self, active): + # python2.6 Event.wait() always returns None + # python2.7 and greater returns the flag value (true/false) + # we want the flag value, so add an 'or' here for python2.6 + # this is redundant for later python versions (FLAG OR FLAG == FLAG) + while not (active.wait(self.t) or active.is_set()): + self.fn(*self.args, **self.kwargs) + + def start(self): + if self.thread is not None: + self.stop() + + self.active = Event() + self.thread = Thread(target=self._timer, args=(self.active,)) + self.thread.daemon = True # So the app exits when main thread exits + self.thread.start() + + def stop(self): + if self.thread is None: + return + + self.active.set() + self.thread.join(self.t + 1) + # noinspection PyAttributeOutsideInit + self.timer = None + self.fn = None + + def __del__(self): + self.stop() diff --git a/monasca_common/kafka_lib/version.py b/monasca_common/kafka_lib/version.py new file mode 100644 index 00000000..9272695b --- /dev/null +++ b/monasca_common/kafka_lib/version.py @@ -0,0 +1 @@ +__version__ = '0.9.5' diff --git a/monasca_common/tests/test_kafka.py b/monasca_common/tests/test_kafka.py index 4a7bae7f..115482a3 100644 --- a/monasca_common/tests/test_kafka.py +++ b/monasca_common/tests/test_kafka.py @@ -27,8 +27,8 @@ FAKE_KAFKA_TOPIC = "topic" class TestKafkaProducer(unittest.TestCase): def setUp(self): - self.kafka_client_patcher = mock.patch('kafka.client') - self.kafka_producer_patcher = mock.patch('kafka.producer') + self.kafka_client_patcher = mock.patch('monasca_common.kafka.producer.kafka_client') + self.kafka_producer_patcher = mock.patch('monasca_common.kafka.producer.kafka_producer') self.mock_kafka_client = self.kafka_client_patcher.start() self.mock_kafka_producer = self.kafka_producer_patcher.start() self.producer = self.mock_kafka_producer.KeyedProducer.return_value @@ -86,9 +86,9 @@ class TestKafkaProducer(unittest.TestCase): class TestKafkaConsumer(unittest.TestCase): def setUp(self): - self.kafka_client_patcher = mock.patch('kafka.client') - self.kafka_common_patcher = mock.patch('kafka.common') - self.kafka_consumer_patcher = mock.patch('kafka.consumer') + self.kafka_client_patcher = mock.patch('monasca_common.kafka.consumer.kafka_client') + self.kafka_common_patcher = mock.patch('monasca_common.kafka.consumer.kafka_common') + self.kafka_consumer_patcher = mock.patch('monasca_common.kafka.consumer.kafka_consumer') self.kazoo_patcher = mock.patch( 'monasca_common.kafka.consumer.KazooClient') diff --git a/requirements.txt b/requirements.txt index 1960936d..6765fec5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ iso8601>=0.1.11 # MIT six>=1.9.0 # MIT kazoo>=2.2 # Apache-2.0 -kafka-python<1.0.0,>=0.9.5 # Apache-2.0 +pykafka>=2.5.0 # Apache 2.0 License PyMySQL>=0.7.6 # MIT License oslo.config!=3.18.0,>=3.14.0 # Apache-2.0 pbr>=1.8 # Apache-2.0 diff --git a/tox.ini b/tox.ini index 9de75415..8125f05f 100644 --- a/tox.ini +++ b/tox.ini @@ -33,3 +33,7 @@ max-line-length = 120 builtins = _ exclude=.venv,.git,.tox,dist,*egg,build show-source = True +# note: Due to the need to fork kafka-python, many pep8 violations occure. +# All of the below ignores are caused by the forked kafka-python library +# so when monasca migrates to pykafka, the below line can be removed. +ignore = E121,E126,E127,E128,E131,E221,E226,E241,E251,E261,E302,E303,E501,E701,F401,H101,H102,H301,H304,H306,H404,H405