Fork 0.9.5 kafka-python and require pykafka

To let other OpenStack projects move forward with new versions of kafka-python
we're forking kafka-python and embedding it in monasca-common.  This allows us
to migrate to the new async interfaces provided by more recent kafka clients
over time and not block other projects.

Requiring pykafka to allow us to have ~4x more throughput once we write to
their async interfaces.

Change-Id: Ifb6ab67ce1335a5ec4ed7dd8b0027dc9d46a6dda
Depends-On: I26f9c588f2818059ab6ba24f9fad8e213798a39c
This commit is contained in:
Joe Keen 2017-01-20 10:49:58 -07:00
parent aae558035e
commit cb2ec23cee
28 changed files with 4864 additions and 17 deletions

View File

@ -18,9 +18,9 @@ import logging
import threading
import time
import kafka.client
import kafka.common
import kafka.consumer
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.common as kafka_common
import monasca_common.kafka_lib.consumer as kafka_consumer
from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner
@ -89,13 +89,13 @@ class KafkaConsumer(object):
self._zookeeper_url = zookeeper_url
self._zookeeper_path = zookeeper_path
self._kafka = kafka.client.KafkaClient(kafka_url)
self._kafka = kafka_client.KafkaClient(kafka_url)
self._consumer = self._create_kafka_consumer()
def _create_kafka_consumer(self, partitions=None):
# No auto-commit so that commits only happen after the message is processed.
consumer = kafka.consumer.SimpleConsumer(
consumer = kafka_consumer.SimpleConsumer(
self._kafka,
self._kafka_group,
self._kafka_topic,
@ -144,7 +144,7 @@ class KafkaConsumer(object):
if time_delta.total_seconds() > self._commit_timeout:
self._commit_callback()
except kafka.common.OffsetOutOfRangeError:
except kafka_common.OffsetOutOfRangeError:
log.error("Kafka OffsetOutOfRange. Jumping to head.")
self._consumer.seek(0, 0)

View File

@ -16,8 +16,8 @@
import logging
import time
import kafka.client
import kafka.producer
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.producer as kafka_producer
log = logging.getLogger(__name__)
@ -32,11 +32,11 @@ class KafkaProducer(object):
url - kafka connection details
"""
self._kafka = kafka.client.KafkaClient(url)
self._producer = kafka.producer.KeyedProducer(
self._kafka = kafka_client.KafkaClient(url)
self._producer = kafka_producer.KeyedProducer(
self._kafka,
async=False,
req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
def publish(self, topic, messages, key=None):

View File

@ -0,0 +1,32 @@
For 0.8, we have correlation id so we can potentially interleave requests/responses
There are a few levels of abstraction:
* Protocol support: encode/decode the requests/responses
* Socket support: send/recieve messages
* API support: higher level APIs such as: get_topic_metadata
# Methods of producing
* Round robbin (each message to the next partition)
* All-to-one (each message to one partition)
* All-to-all? (each message to every partition)
* Partitioned (run each message through a partitioning function)
** HashPartitioned
** FunctionPartition
# Possible API
client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic")
producer.send_string("hello")
consumer = KafkaConsumer(client, "group", "topic")
consumer.seek(10, 2) # seek to beginning (lowest offset)
consumer.commit() # commit it
for msg in consumer.iter_messages():
print msg

View File

@ -0,0 +1,21 @@
# __title__ = 'kafka'
from .version import __version__
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0'
# from monasca_common.kafka_lib.client import KafkaClient
# from monasca_common.kafka_lib.conn import KafkaConnection
# from monasca_common.kafka_lib.protocol import (
# create_message, create_gzip_message, create_snappy_message
# )
# from monasca_common.kafka_lib.producer import SimpleProducer, KeyedProducer
# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
# from monasca_common.kafka_lib.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer
#
# __all__ = [
# 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
# 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
# 'MultiProcessConsumer', 'create_message', 'create_gzip_message',
# 'create_snappy_message', 'KafkaConsumer',
# ]

View File

@ -0,0 +1,676 @@
import collections
import copy
import functools
import logging
import select
import time
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
from monasca_common.kafka_lib.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from monasca_common.kafka_lib.protocol import KafkaProtocol
from monasca_common.kafka_lib.util import kafka_bytestring
log = logging.getLogger(__name__)
class KafkaClient(object):
CLIENT_ID = b'kafka-python'
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
self.topic_partitions = {} # topic -> partition -> PartitionMetadata
self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
##################
def _get_conn(self, host, port):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
)
return self.conns[host_key]
def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
but has no leader.
UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
LeaderNotAvailableError is raised if server has metadata, but there is
no current leader
"""
key = TopicAndPartition(topic, partition)
# Use cached metadata if it is there
if self.topics_to_brokers.get(key) is not None:
return self.topics_to_brokers[key]
# Otherwise refresh metadata
# If topic does not already exist, this will raise
# UnknownTopicOrPartitionError if not auto-creating
# LeaderNotAvailableError otherwise until partitions are created
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
if partition not in self.topic_partitions.get(topic, []):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
meta = self.topic_partitions[topic][partition]
if meta.leader == -1:
raise LeaderNotAvailableError(meta)
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]
def _get_coordinator_for_group(self, group):
"""
Returns the coordinator broker for a consumer group.
ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
does not currently exist for the group.
OffsetsLoadInProgressCode is raised if the coordinator is available
but is still loading offsets from the internal topic
"""
resp = self.send_consumer_metadata_request(group)
# If there's a problem with finding the coordinator, raise the
# provided error
kafka_common.check_error(resp)
# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
def _next_id(self):
"""Generate a new correlation id"""
# modulo to keep w/i int32
self.correlation_id = (self.correlation_id + 1) % 2**31
return self.correlation_id
def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
requestId = self._next_id()
log.debug('Request %s: %s', requestId, payloads)
try:
conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId,
payloads=payloads)
conn.send(requestId, request)
response = conn.recv(requestId)
decoded = decoder_fn(response)
log.debug('Response %s: %s', requestId, decoded)
return decoded
except Exception:
log.exception('Error sending request [%s] to server %s:%s, '
'trying next server', requestId, host, port)
raise KafkaUnavailableError('All servers failed to process request')
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
Group a list of request payloads by topic+partition and send them to
the leader broker for that partition using the supplied encode/decode
functions
Arguments:
payloads: list of object-like entities with a topic (str) and
partition (int) attribute; payloads with duplicate topic-partitions
are not supported.
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
decode_fn: a method to decode a response body into response objects.
The response objects must be object-like and have topic
and partition attributes
Returns:
List of response objects in the same order as the supplied payloads
"""
# encoders / decoders do not maintain ordering currently
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]
# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
responses = {}
for payload in payloads:
try:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
payloads_by_broker[leader].append(payload)
brokers_for_payloads.append(leader)
except KafkaUnavailableError as e:
log.warning('KafkaUnavailableError attempting to send request '
'on topic %s partition %d', payload.topic, payload.partition)
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
# For each broker, send the list of request payloads
# and collect the responses and errors
broker_failures = []
# For each KafkaConnection keep the real socket so that we can use
# a select to perform unblocking I/O
connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
try:
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
conn.send(requestId, request)
except ConnectionError as e:
broker_failures.append(broker)
log.warning('ConnectionError attempting to send request %s '
'to server %s: %s', requestId, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
# No exception, try to get response
else:
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
else:
connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
conn, broker, requestId = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
requestId, broker, e)
for payload in payloads_by_broker[broker]:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
else:
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
# Unfortunately there is no good way to tell the difference
# so we'll just reset metadata on all errors to be safe
if broker_failures:
self.reset_all_metadata()
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
"""
Send a list of requests to the consumer coordinator for the group
specified using the supplied encode/decode functions. As the payloads
that use consumer-aware requests do not contain the group (e.g.
OffsetFetchRequest), all payloads must be for a single group.
Arguments:
group: the name of the consumer group (str) the payloads are for
payloads: list of object-like entities with topic (str) and
partition (int) attributes; payloads with duplicate
topic+partition are not supported.
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
decode_fn: a method to decode a response body into response objects.
The response objects must be object-like and have topic
and partition attributes
Returns:
List of response objects in the same order as the supplied payloads
"""
# encoders / decoders do not maintain ordering currently
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]
broker = self._get_coordinator_for_group(group)
# Send the list of request payloads and collect the responses and
# errors
responses = {}
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
try:
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
conn.send(requestId, request)
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
'to server %s: %s', requestId, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
# No exception, try to get response
else:
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
return []
try:
response = conn.recv(requestId)
except ConnectionError as e:
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
requestId, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
else:
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
# Response can be an unraised exception object (FailedPayloadsError)
if isinstance(resp, Exception):
raise resp
# Or a server api error response
try:
kafka_common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
# Return False if no error to enable list comprehensions
return False
#################
# Public API #
#################
def close(self):
for conn in self.conns.values():
conn.close()
def copy(self):
"""
Create an inactive copy of the client object, suitable for passing
to a separate thread.
Note that the copied connections are not initialized, so reinit() must
be called on the returned copy.
"""
c = copy.deepcopy(self)
for key in c.conns:
c.conns[key] = self.conns[key].copy()
return c
def reinit(self):
for conn in self.conns.values():
conn.reinit()
def reset_topic_metadata(self, *topics):
for topic in topics:
for topic_partition in list(self.topics_to_brokers.keys()):
if topic_partition.topic == topic:
del self.topics_to_brokers[topic_partition]
if topic in self.topic_partitions:
del self.topic_partitions[topic]
def reset_all_metadata(self):
self.topics_to_brokers.clear()
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
return []
return sorted(list(self.topic_partitions[topic]))
@property
def topics(self):
return list(self.topic_partitions.keys())
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
pass
except UnknownTopicOrPartitionError:
# Server is not configured to auto-create
# retrying in this case will not help
raise
time.sleep(.5)
def load_metadata_for_topics(self, *topics):
"""
Fetch broker and topic-partition metadata from the server,
and update internal data:
broker list, topic/partition list, and topic/parition -> broker map
This method should be called after receiving any error
Arguments:
*topics (optional): If a list of topics is provided,
the metadata refresh will be limited to the specified topics only.
Exceptions:
----------
If the broker is configured to not auto-create topics,
expect UnknownTopicOrPartitionError for topics that don't exist
If the broker is configured to auto-create topics,
expect LeaderNotAvailableError for new topics
until partitions have been initialized.
Exceptions *will not* be raised in a full refresh (i.e. no topic list)
In this case, error codes will be logged as errors
Partition-level errors will also not be raised here
(a single partition w/o a leader, for example)
"""
topics = [kafka_bytestring(t) for t in topics]
if topics:
for topic in topics:
self.reset_topic_metadata(topic)
else:
self.reset_all_metadata()
resp = self.send_metadata_request(topics)
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
for topic_metadata in resp.topics:
topic = topic_metadata.topic
partitions = topic_metadata.partitions
# Errors expected for new topics
try:
kafka_common.check_error(topic_metadata)
except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
# Raise if the topic was passed in explicitly
if topic in topics:
raise
# Otherwise, just log a warning
log.error('Error loading topic metadata for %s: %s', topic, type(e))
continue
self.topic_partitions[topic] = {}
for partition_metadata in partitions:
partition = partition_metadata.partition
leader = partition_metadata.leader
self.topic_partitions[topic][partition] = partition_metadata
# Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
# Check for partition errors
try:
kafka_common.check_error(partition_metadata)
# If No Leader, topics_to_brokers topic_partition -> None
except LeaderNotAvailableError:
log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
continue
# If one of the replicas is unavailable -- ignore
# this error code is provided for admin purposes only
# we never talk to replicas, only the leader
except ReplicaNotAvailableError:
log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers:
self.topics_to_brokers[topic_part] = self.brokers[leader]
# If Unknown Broker, fake BrokerMetadata so we dont lose the id
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = BrokerMetadata(
leader, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response
return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_consumer_metadata_request
decoder = KafkaProtocol.decode_consumer_metadata_response
return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Encode and send some ProduceRequests
ProduceRequests will be grouped by (topic, partition) and then
sent to a specific broker. Output is a list of responses in the
same order as the list of payloads specified
Arguments:
payloads (list of ProduceRequest): produce requests to send to kafka
ProduceRequest payloads must not contain duplicates for any
topic-partition.
acks (int, optional): how many acks the servers should receive from replica
brokers before responding to the request. If it is 0, the server
will not send any response. If it is 1, the server will wait
until the data is written to the local log before sending a
response. If it is -1, the server will wait until the message
is committed by all in-sync replicas before sending a response.
For any value > 1, the server will wait for this number of acks to
occur (but the server will never wait for more acknowledgements than
there are in-sync replicas). defaults to 1.
timeout (int, optional): maximum time in milliseconds the server can
await the receipt of the number of acks, defaults to 1000.
fail_on_error (bool, optional): raise exceptions on connection and
server response errors, defaults to True.
callback (function, optional): instead of returning the ProduceResponse,
first pass it through this function, defaults to None.
Returns:
list of ProduceResponses, or callback results if supplied, in the
order of input payloads
"""
encoder = functools.partial(
KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)
if acks == 0:
decoder = None
else:
decoder = KafkaProtocol.decode_produce_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and
(not fail_on_error or not self._raise_on_response_error(resp))]
def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
encoder = functools.partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)
resps = self._send_broker_aware_request(
payloads, encoder,
KafkaProtocol.decode_fetch_response)
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
resps = self._send_broker_aware_request(
payloads,
KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_fetch_request_kafka(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group, from_kafka=True)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

View File

@ -0,0 +1,155 @@
import gzip
from io import BytesIO
import struct
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
_HAS_SNAPPY = True
except ImportError:
_HAS_SNAPPY = False
def has_gzip():
return True
def has_snappy():
return _HAS_SNAPPY
def gzip_encode(payload, compresslevel=None):
if not compresslevel:
compresslevel = 9
with BytesIO() as buf:
# Gzip context manager introduced in python 2.6
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel)
try:
gzipper.write(payload)
finally:
gzipper.close()
result = buf.getvalue()
return result
def gzip_decode(payload):
with BytesIO(payload) as buf:
# Gzip context manager introduced in python 2.6
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode='r')
try:
result = gzipper.read()
finally:
gzipper.close()
return result
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy if xerial_compatible is set then the
stream is encoded in a fashion compatible with the xerial snappy library
The block size (xerial_blocksize) controls how frequent the blocking
occurs 32k is the default in the xerial library.
The format winds up being
+-------------+------------+--------------+------------+--------------+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
|-------------+------------+--------------+------------+--------------|
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+-------------+------------+--------------+------------+--------------+
It is important to not that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the
number of bytes that will be present in the stream, that is the
length will always be <= blocksize.
"""
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if xerial_compatible:
def _chunker():
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
out = BytesIO()
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
for chunk in _chunker():
block = snappy.compress(chunk)
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(block)
out.seek(0)
return out.read()
else:
return snappy.compress(payload)
def _detect_xerial_stream(payload):
"""Detects if the data given might have been encoded with the blocking mode
of the xerial snappy library.
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
|--------+--------------+------------+---------+--------|
| byte | c-string | byte | int32 | int32 |
|--------+--------------+------------+---------+--------|
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
The pad appears to be to ensure that SNAPPY is a valid cstring
The version is the version of this format as written by xerial,
in the wild this is currently 1 as such we only support v1.
Compat is there to claim the miniumum supported version that
can read a xerial block stream, presently in the wild this is
1.
"""
if len(payload) > 16:
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False
def snappy_decode(payload):
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = BytesIO()
byt = payload[16:]
length = len(byt)
cursor = 0
while cursor < length:
block_size = struct.unpack_from('!i', byt[cursor:])[0]
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
cursor = end
out.seek(0)
return out.read()
else:
return snappy.decompress(payload)

View File

@ -0,0 +1,270 @@
import inspect
import sys
from collections import namedtuple
###############
# Structs #
###############
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
["groups"])
ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequest = namedtuple("FetchRequest",
["topic", "partition", "offset", "max_bytes"])
FetchResponse = namedtuple("FetchResponse",
["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequest = namedtuple("OffsetRequest",
["topic", "partition", "time", "max_offsets"])
OffsetResponse = namedtuple("OffsetResponse",
["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse",
["topic", "partition", "error"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest",
["topic", "partition"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset", "metadata", "error"])
# Other useful structs
BrokerMetadata = namedtuple("BrokerMetadata",
["nodeId", "host", "port"])
TopicMetadata = namedtuple("TopicMetadata",
["topic", "error", "partitions"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMessage = namedtuple("OffsetAndMessage",
["offset", "message"])
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"])
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
#################
# Exceptions #
#################
class KafkaError(RuntimeError):
pass
class BrokerResponseError(KafkaError):
pass
class UnknownError(BrokerResponseError):
errno = -1
message = 'UNKNOWN'
class OffsetOutOfRangeError(BrokerResponseError):
errno = 1
message = 'OFFSET_OUT_OF_RANGE'
class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITON'
class InvalidFetchRequestError(BrokerResponseError):
errno = 4
message = 'INVALID_FETCH_SIZE'
class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'
class NotLeaderForPartitionError(BrokerResponseError):
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'
class RequestTimedOutError(BrokerResponseError):
errno = 7
message = 'REQUEST_TIMED_OUT'
class BrokerNotAvailableError(BrokerResponseError):
errno = 8
message = 'BROKER_NOT_AVAILABLE'
class ReplicaNotAvailableError(BrokerResponseError):
errno = 9
message = 'REPLICA_NOT_AVAILABLE'
class MessageSizeTooLargeError(BrokerResponseError):
errno = 10
message = 'MESSAGE_SIZE_TOO_LARGE'
class StaleControllerEpochError(BrokerResponseError):
errno = 11
message = 'STALE_CONTROLLER_EPOCH'
class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
class StaleLeaderEpochCodeError(BrokerResponseError):
errno = 13
message = 'STALE_LEADER_EPOCH_CODE'
class OffsetsLoadInProgressCode(BrokerResponseError):
errno = 14
message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
errno = 15
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
class NotCoordinatorForConsumerCode(BrokerResponseError):
errno = 16
message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
class KafkaUnavailableError(KafkaError):
pass
class KafkaTimeoutError(KafkaError):
pass
class FailedPayloadsError(KafkaError):
def __init__(self, payload, *args):
super(FailedPayloadsError, self).__init__(*args)
self.payload = payload
class ConnectionError(KafkaError):
pass
class BufferUnderflowError(KafkaError):
pass
class ChecksumError(KafkaError):
pass
class ConsumerFetchSizeTooSmall(KafkaError):
pass
class ConsumerNoMoreData(KafkaError):
pass
class ConsumerTimeout(KafkaError):
pass
class ProtocolError(KafkaError):
pass
class UnsupportedCodecError(KafkaError):
pass
class KafkaConfigurationError(KafkaError):
pass
class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)
self.failed_msgs = failed_msgs
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
yield obj
kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
def check_error(response):
if isinstance(response, Exception):
raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
)
RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
)
RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES

View File

@ -0,0 +1,223 @@
import copy
import logging
from random import shuffle
import socket
import struct
from threading import local
import six
from monasca_common.kafka_lib.common import ConnectionError
log = logging.getLogger(__name__)
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
"""
if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')
result = []
for host_port in hosts:
res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
result.append((host.strip(), port))
if randomize:
shuffle(result)
return result
class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
Arguments:
host: the host name or IP address of a kafka broker
port: the port number the kafka broker is listening on
timeout: default 120. The socket timeout for sending and receiving data
in seconds. None means no timeout, so a request can block forever.
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self._sock = None
self.reinit()
def __getnewargs__(self):
return (self.host, self.port, self.timeout)
def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
def _raise_connection_error(self):
# Cleanup socket if we have one
if self._sock:
self.close()
# And then raise
raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
def _read_bytes(self, num_bytes):
bytes_left = num_bytes
responses = []
log.debug("About to read %d bytes from Kafka", num_bytes)
# Make sure we have a connection
if not self._sock:
self.reinit()
while bytes_left:
try:
data = self._sock.recv(min(bytes_left, 4096))
# Receiving empty string from recv signals
# that the socket is in error. we will never get
# more data from this socket
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
responses.append(data)
return b''.join(responses)
##################
# Public API #
##################
# TODO multiplex socket communication to allow for multi-threaded clients
def get_connected_socket(self):
if not self._sock:
self.reinit()
return self._sock
def send(self, request_id, payload):
"""
Send a request to Kafka
Arguments::
request_id (int): can be any int (used only for debug logging...)
payload: an encoded kafka packet (see KafkaProtocol)
"""
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
# Make sure we have a connection
if not self._sock:
self.reinit()
try:
self._sock.sendall(payload)
except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
def recv(self, request_id):
"""
Get a response packet from Kafka
Arguments:
request_id: can be any int (only used for debug logging...)
Returns:
str: Encoded kafka packet response from server
"""
log.debug("Reading response %d from Kafka" % request_id)
# Make sure we have a connection
if not self._sock:
self.reinit()
# Read the size off of the header
resp = self._read_bytes(4)
(size,) = struct.unpack('>i', resp)
# Read the remainder of the response
resp = self._read_bytes(size)
return resp
def copy(self):
"""
Create an inactive copy of the connection object, suitable for
passing to a background thread.
The returned copy is not connected; you must call reinit() before
using.
"""
c = copy.deepcopy(self)
# Python 3 doesn't copy custom attributes of the threadlocal subclass
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c._sock = None
return c
def close(self):
"""
Shutdown and close the connection socket
"""
log.debug("Closing socket connection for %s:%d" % (self.host, self.port))
if self._sock:
# Call shutdown to be a good TCP client
# But expect an error if the socket has already been
# closed by the server
try:
self._sock.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
# Closing the socket should always succeed
self._sock.close()
self._sock = None
else:
log.debug("No socket found to close!")
def reinit(self):
"""
Re-initialize the socket connection
close current socket (if open)
and start a fresh connection
raise ConnectionError on error
"""
log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port))
if self._sock:
self.close()
try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
self._raise_connection_error()

View File

@ -0,0 +1,7 @@
from .simple import SimpleConsumer
from .multiprocess import MultiProcessConsumer
from .kafka import KafkaConsumer
__all__ = [
'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'
]

View File

@ -0,0 +1,229 @@
from __future__ import absolute_import
import atexit
import logging
import numbers
from threading import Lock
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
UnknownTopicOrPartitionError, check_error, KafkaError
)
from monasca_common.kafka_lib.util import kafka_bytestring, ReentrantTimer
log = logging.getLogger('kafka.consumer')
AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
FETCH_BUFFER_SIZE_BYTES = 4096
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
MAX_BACKOFF_SECONDS = 60
class Consumer(object):
"""
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
* initialization and fetching metadata of partitions
* Auto-commit logic
* APIs for fetching pending message count
"""
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
self.topic = kafka_bytestring(topic)
self.group = None if group is None else kafka_bytestring(group)
self.client.load_metadata_for_topics(topic)
self.offsets = {}
if partitions is None:
partitions = self.client.get_partition_ids_for_topic(topic)
else:
assert all(isinstance(x, numbers.Integral) for x in partitions)
# Variables for handling offset commits
self.commit_lock = Lock()
self.commit_timer = None
self.count_since_commit = 0
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self.commit)
self.commit_timer.start()
# Set initial offsets
if self.group is not None:
self.fetch_last_known_offsets(partitions)
else:
for partition in partitions:
self.offsets[partition] = 0
# Register a cleanup handler
def cleanup(obj):
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
self.partition_info = False # Do not return partition info in msgs
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
"""
self.partition_info = True
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
raise ValueError('KafkaClient.group must not be None')
if partitions is None:
partitions = self.client.get_partition_ids_for_topic(self.topic)
responses = self.client.send_offset_fetch_request(
self.group,
[OffsetFetchRequest(self.topic, p) for p in partitions],
fail_on_error=False
)
for resp in responses:
try:
check_error(resp)
# API spec says server wont set an error here
# but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
pass
# -1 offset signals no commit is currently stored
if resp.offset == -1:
self.offsets[resp.partition] = 0
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
self.offsets[resp.partition] = resp.offset
def commit(self, partitions=None):
"""Commit stored offsets to Kafka via OffsetCommitRequest (v0)
Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them
Returns: True on success, False on failure
"""
# short circuit if nothing happened. This check is kept outside
# to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return
with self.commit_lock:
# Do this check again, just in case the state has changed
# during the lock acquiring timeout
if self.count_since_commit == 0:
return
reqs = []
if partitions is None: # commit all partitions
partitions = list(self.offsets.keys())
log.debug('Committing new offsets for %s, partitions %s',
self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
log.debug('Commit offset %d in SimpleConsumer: '
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)
reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))
try:
self.client.send_offset_commit_request(self.group, reqs)
except KafkaError as e:
log.error('%s saving offsets: %s', e.__class__.__name__, e)
return False
else:
self.count_since_commit = 0
return True
def _auto_commit(self):
"""
Check if we have to commit based on number of messages and commit
"""
# Check if we are supposed to do an auto-commit
if not self.auto_commit or self.auto_commit_every_n is None:
return
if self.count_since_commit >= self.auto_commit_every_n:
self.commit()
def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped
# py3 supports unregistering
if hasattr(atexit, 'unregister'):
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:
# ValueError on list.remove() if the exithandler no longer
# exists is fine here
try:
atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
except ValueError:
pass
del self._cleanup_func
def pending(self, partitions=None):
"""
Gets the pending message count
Keyword Arguments:
partitions (list): list of partitions to check for, default is to check all
"""
if partitions is None:
partitions = self.offsets.keys()
total = 0
reqs = []
for partition in partitions:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
resps = self.client.send_offset_request(reqs)
for resp in resps:
partition = resp.partition
pending = resp.offsets[0]
offset = self.offsets[partition]
total += pending - offset
return total

View File

@ -0,0 +1,772 @@
from __future__ import absolute_import
from collections import namedtuple
from copy import deepcopy
import logging
import random
import sys
import time
import six
from monasca_common.kafka_lib.client import KafkaClient
from monasca_common.kafka_lib.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
from monasca_common.kafka_lib.util import kafka_bytestring
logger = logging.getLogger(__name__)
OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
DEFAULT_CONSUMER_CONFIG = {
'client_id': __name__,
'group_id': None,
'bootstrap_servers': [],
'socket_timeout_ms': 30 * 1000,
'fetch_message_max_bytes': 1024 * 1024,
'auto_offset_reset': 'largest',
'fetch_min_bytes': 1,
'fetch_wait_max_ms': 100,
'refresh_leader_backoff_ms': 200,
'deserializer_class': lambda msg: msg,
'auto_commit_enable': False,
'auto_commit_interval_ms': 60 * 1000,
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,
# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'num_consumer_fetchers': 1,
'default_fetcher_backoff_ms': 1000,
'queued_max_message_chunks': 10,
'rebalance_max_retries': 4,
'rebalance_backoff_ms': 2000,
}
DEPRECATED_CONFIG_KEYS = {
'metadata_broker_list': 'bootstrap_servers',
}
class KafkaConsumer(object):
"""A simpler kafka consumer"""
DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG)
def __init__(self, *topics, **configs):
self.configure(**configs)
self.set_topic_partitions(*topics)
def configure(self, **configs):
"""Configure the consumer instance
Configuration settings can be passed to constructor,
otherwise defaults will be used:
Keyword Arguments:
bootstrap_servers (list): List of initial broker nodes the consumer
should contact to bootstrap initial cluster metadata. This does
not have to be the full node list. It just needs to have at
least one broker that will respond to a Metadata API Request.
client_id (str): a unique name for this client. Defaults to
'kafka.consumer.kafka'.
group_id (str): the name of the consumer group to join,
Offsets are fetched / committed to this group name.
fetch_message_max_bytes (int, optional): Maximum bytes for each
topic/partition fetch request. Defaults to 1024*1024.
fetch_min_bytes (int, optional): Minimum amount of data the server
should return for a fetch request, otherwise wait up to
fetch_wait_max_ms for more data to accumulate. Defaults to 1.
fetch_wait_max_ms (int, optional): Maximum time for the server to
block waiting for fetch_min_bytes messages to accumulate.
Defaults to 100.
refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
when refreshing metadata on errors (subject to random jitter).
Defaults to 200.
socket_timeout_ms (int, optional): TCP socket timeout in
milliseconds. Defaults to 30*1000.
auto_offset_reset (str, optional): A policy for resetting offsets on
OffsetOutOfRange errors. 'smallest' will move to the oldest
available message, 'largest' will move to the most recent. Any
ofther value will raise the exception. Defaults to 'largest'.
deserializer_class (callable, optional): Any callable that takes a
raw message value and returns a deserialized value. Defaults to
lambda msg: msg.
auto_commit_enable (bool, optional): Enabling auto-commit will cause
the KafkaConsumer to periodically commit offsets without an
explicit call to commit(). Defaults to False.
auto_commit_interval_ms (int, optional): If auto_commit_enabled,
the milliseconds between automatic offset commits. Defaults to
60 * 1000.
auto_commit_interval_messages (int, optional): If
auto_commit_enabled, a number of messages consumed between
automatic offset commits. Defaults to None (disabled).
consumer_timeout_ms (int, optional): number of millisecond to throw
a timeout exception to the consumer if no message is available
for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
configs = self._deprecate_configs(**configs)
self._config = {}
for key in self.DEFAULT_CONFIG:
self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key])
if configs:
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError(
'KafkaConsumer configured to auto-commit '
'without required consumer group (group_id)'
)
# Check auto-commit configuration
if self._config['auto_commit_enable']:
logger.info("Configuring consumer to auto-commit offsets")
self._reset_auto_commit()
if not self._config['bootstrap_servers']:
raise KafkaConfigurationError(
'bootstrap_servers required to configure KafkaConsumer'
)
self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
)
def set_topic_partitions(self, *topics):
"""
Set the topic/partitions to consume
Optionally specify offsets to start from
Accepts types:
* str (utf-8): topic name (will consume all available partitions)
* tuple: (topic, partition)
* dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
* tuple: (topic, partition, offset)
* dict: { (topic, partition): offset, ... }
Example:
.. code:: python
kafka = KafkaConsumer()
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
# Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
"""
self._topics = []
self._client.load_metadata_for_topics()
# Setup offsets
self._offsets = OffsetsStruct(fetch=dict(),
commit=dict(),
highwater=dict(),
task_done=dict())
# Handle different topic types
for arg in topics:
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
topic = kafka_bytestring(arg)
for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0])
partition = arg[1]
self._consume_topic_partition(topic, partition)
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
for key, value in six.iteritems(arg):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
topic = kafka_bytestring(key)
# topic: partition
if isinstance(value, int):
self._consume_topic_partition(topic, value)
# topic: [ partition1, partition2, ... ]
elif isinstance(value, (list, tuple)):
for partition in value:
self._consume_topic_partition(topic, partition)
else:
raise KafkaConfigurationError(
'Unknown topic type '
'(dict key must be int or list/tuple of ints)'
)
# (topic, partition): offset
elif isinstance(key, tuple):
topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value
else:
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
# If we have a consumer group, try to fetch stored offsets
if self._config['group_id']:
self._get_commit_offsets()
# Update missing fetch/commit offsets
for topic_partition in self._topics:
# Commit offsets default is None
if topic_partition not in self._offsets.commit:
self._offsets.commit[topic_partition] = None
# Skip if we already have a fetch offset from user args
if topic_partition not in self._offsets.fetch:
# Fetch offsets default is (1) commit
if self._offsets.commit[topic_partition] is not None:
self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
# or (2) auto reset
else:
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
# highwater marks (received from server on fetch response)
# and task_done (set locally by user)
# should always get initialized to None
self._reset_highwater_offsets()
self._reset_task_done_offsets()
# Reset message iterator in case we were in the middle of one
self._reset_message_iterator()
def close(self):
"""Close this consumer's underlying client."""
self._client.close()
def next(self):
"""Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
Returns:
a single KafkaMessage from the message iterator
Raises:
ConsumerTimeout after consumer_timeout_ms and no message
Note:
This is also the method called internally during iteration
"""
self._set_consumer_timeout_start()
while True:
try:
return six.next(self._get_message_iterator())
# Handle batch completion
except StopIteration:
self._reset_message_iterator()
self._check_consumer_timeout()
def fetch_messages(self):
"""Sends FetchRequests for all topic/partitions set for consumption
Returns:
Generator that yields KafkaMessage structs
after deserializing with the configured `deserializer_class`
Note:
Refreshes metadata on errors, and resets fetch offset on
OffsetOutOfRange, per the configured `auto_offset_reset` policy
See Also:
Key KafkaConsumer configuration parameters:
* `fetch_message_max_bytes`
* `fetch_max_wait_ms`
* `fetch_min_bytes`
* `deserializer_class`
* `auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
if not self._topics:
raise KafkaConfigurationError('No topics or partitions configured')
if not self._offsets.fetch:
raise KafkaConfigurationError(
'No fetch offsets found when calling fetch_messages'
)
fetches = [FetchRequest(topic, partition,
self._offsets.fetch[(topic, partition)],
max_bytes)
for (topic, partition) in self._topics]
# send_fetch_request will batch topic/partition requests by leader
responses = self._client.send_fetch_request(
fetches,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
fail_on_error=False
)
for resp in responses:
if isinstance(resp, FailedPayloadsError):
logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
continue
topic = kafka_bytestring(resp.topic)
partition = resp.partition
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
self._offsets.fetch[(topic, partition)],
resp.highwaterMark)
# Reset offset
self._offsets.fetch[(topic, partition)] = (
self._reset_partition_offset((topic, partition))
)
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
topic, partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
topic, partition)
continue
# Track server highwater mark
self._offsets.highwater[(topic, partition)] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
val = self._config['deserializer_class'](message.value)
msg = KafkaMessage(topic, partition, offset, message.key, val)
# in some cases the server will return earlier messages
# than we requested. skip them per kafka spec
if offset < self._offsets.fetch[(topic, partition)]:
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
# Only increment fetch offset
# if we safely got the message and deserialized
self._offsets.fetch[(topic, partition)] = offset + 1
# Then yield to user
yield msg
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
"""Request available fetch offsets for a single topic/partition
Keyword Arguments:
topic (str): topic for offset request
partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values.
Specify -1 to receive the latest offset (i.e. the offset of the
next coming message) and -2 to receive the earliest available
offset. Note that because offsets are pulled in descending
order, asking for the earliest offset will always return you a
single element.
max_num_offsets (int): Maximum offsets to include in the OffsetResponse
Returns:
a list of offsets in the OffsetResponse submitted for the provided
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)
check_error(resp)
# Just for sanity..
# probably unnecessary
assert resp.topic == topic
assert resp.partition == partition
return resp.offsets
def offsets(self, group=None):
"""Get internal consumer offset values
Keyword Arguments:
group: Either "fetch", "commit", "task_done", or "highwater".
If no group specified, returns all groups.
Returns:
A copy of internal offsets struct
"""
if not group:
return {
'fetch': self.offsets('fetch'),
'commit': self.offsets('commit'),
'task_done': self.offsets('task_done'),
'highwater': self.offsets('highwater')
}
else:
return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
"""Mark a fetched message as consumed.
Offsets for messages marked as "task_done" will be stored back
to the kafka cluster for this consumer group on commit()
Arguments:
message (KafkaMessage): the message to mark as complete
Returns:
True, unless the topic-partition for this message has not
been configured for the consumer. In normal operation, this
should not happen. But see github issue 364.
"""
topic_partition = (message.topic, message.partition)
if topic_partition not in self._topics:
logger.warning('Unrecognized topic/partition in task_done message: '
'{0}:{1}'.format(*topic_partition))
return False
offset = message.offset
# Warn on non-contiguous offsets
prev_done = self._offsets.task_done[topic_partition]
if prev_done is not None and offset != (prev_done + 1):
logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
offset, prev_done)
# Warn on smaller offsets than previous commit
# "commit" offsets are actually the offset of the next message to fetch.
prev_commit = self._offsets.commit[topic_partition]
if prev_commit is not None and ((offset + 1) <= prev_commit):
logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
offset, prev_commit)
self._offsets.task_done[topic_partition] = offset
# Check for auto-commit
if self._does_auto_commit_messages():
self._incr_auto_commit_message_count()
if self._should_auto_commit():
self.commit()
return True
def commit(self):
"""Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
Returns:
True on success, or False if no offsets were found for commit
Note:
this functionality requires server version >=0.8.1.1
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
raise KafkaConfigurationError(
'Attempted to commit offsets '
'without a configured consumer group (group_id)'
)
# API supports storing metadata with each commit
# but for now it is unused
metadata = b''
offsets = self._offsets.task_done
commits = []
for topic_partition, task_done_offset in six.iteritems(offsets):
# Skip if None
if task_done_offset is None:
continue
# Commit offsets as the next offset to fetch
# which is consistent with the Java Client
# task_done is marked by messages consumed,
# so add one to mark the next message for fetching
commit_offset = (task_done_offset + 1)
# Skip if no change from previous committed
if commit_offset == self._offsets.commit[topic_partition]:
continue
commits.append(
OffsetCommitRequest(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(
kafka_bytestring(self._config['group_id']), commits,
fail_on_error=False
)
for r in resps:
check_error(r)
topic_partition = (r.topic, r.partition)
task_done = self._offsets.task_done[topic_partition]
self._offsets.commit[topic_partition] = (task_done + 1)
if self._config['auto_commit_enable']:
self._reset_auto_commit()
return True
else:
logger.info('No new offsets found to commit in group %s', self._config['group_id'])
return False
#
# Topic/partition management private methods
#
def _consume_topic_partition(self, topic, partition):
topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
if topic not in self._client.topic_partitions:
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
if partition not in self._client.get_partition_ids_for_topic(topic):
raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
"in broker metadata" % (partition, topic))
logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
self._topics.append((topic, partition))
def _refresh_metadata_on_error(self):
refresh_ms = self._config['refresh_leader_backoff_ms']
jitter_pct = 0.20
sleep_ms = random.randint(
int((1.0 - 0.5 * jitter_pct) * refresh_ms),
int((1.0 + 0.5 * jitter_pct) * refresh_ms)
)
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)
try:
self._client.load_metadata_for_topics()
except KafkaUnavailableError:
logger.warning("Unable to refresh topic metadata... cluster unavailable")
self._check_consumer_timeout()
else:
logger.info("Topic metadata refreshed")
return
#
# Offset-managment private methods
#
def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
# API spec says server wont set an error here
# but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
pass
# -1 offset signals no commit is currently stored
if resp.offset == -1:
self._offsets.commit[topic_partition] = None
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
self._offsets.commit[topic_partition] = resp.offset
def _reset_highwater_offsets(self):
for topic_partition in self._topics:
self._offsets.highwater[topic_partition] = None
def _reset_task_done_offsets(self):
for topic_partition in self._topics:
self._offsets.task_done[topic_partition] = None
def _reset_partition_offset(self, topic_partition):
(topic, partition) = topic_partition
LATEST = -1
EARLIEST = -2
request_time_ms = None
if self._config['auto_offset_reset'] == 'largest':
request_time_ms = LATEST
elif self._config['auto_offset_reset'] == 'smallest':
request_time_ms = EARLIEST
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)
return offset
#
# Consumer Timeout private methods
#
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
#
# Autocommit private methods
#
def _should_auto_commit(self):
if self._does_auto_commit_ms():
if time.time() >= self._next_commit_time:
return True
if self._does_auto_commit_messages():
if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
return True
return False
def _reset_auto_commit(self):
self._uncommitted_message_count = 0
self._next_commit_time = None
if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n
def _does_auto_commit_ms(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_ms']
if conf is not None and conf > 0:
return True
return False
def _does_auto_commit_messages(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_messages']
if conf is not None and conf > 0:
return True
return False
#
# Message iterator private methods
#
def __iter__(self):
return self
def __next__(self):
return self.next()
def _get_message_iterator(self):
# Fetch a new batch if needed
if self._msg_iter is None:
self._msg_iter = self.fetch_messages()
return self._msg_iter
def _reset_message_iterator(self):
self._msg_iter = None
#
# python private methods
#
def __repr__(self):
return '<{0} topics=({1})>'.format(
self.__class__.__name__,
'|'.join(["%s-%d" % topic_partition
for topic_partition in self._topics])
)
#
# other private methods
#
def _deprecate_configs(self, **configs):
for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
if old in configs:
logger.warning('Deprecated Kafka Consumer configuration: %s. '
'Please use %s instead.', old, new)
old_value = configs.pop(old)
if new not in configs:
configs[new] = old_value
return configs

View File

@ -0,0 +1,292 @@
from __future__ import absolute_import
from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
try:
import queue # python 3
except ImportError:
import Queue as queue # python 2
import time
from ..common import KafkaError
from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS,
MAX_BACKOFF_SECONDS,
)
from .simple import SimpleConsumer
log = logging.getLogger(__name__)
Events = namedtuple("Events", ["start", "pause", "exit"])
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
NOTE: Ideally, this should have been a method inside the Consumer
class. However, multiprocessing module has issues in windows. The
functionality breaks unless this function is kept outside of a class
"""
# Initial interval for retries in seconds.
interval = 1
while not events.exit.is_set():
try:
# Make the child processes open separate socket connections
client.reinit()
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None,
**consumer_options)
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
while True:
# Wait till the controller indicates us to start consumption
events.start.wait()
# If we are asked to quit, do so
if events.exit.is_set():
break
# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0
message = consumer.get_message()
if message:
while True:
try:
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except queue.Full:
if events.exit.is_set(): break
count += 1
# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
events.pause.wait()
else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
except KafkaError as e:
# Retry with exponential backoff
log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
class MultiProcessConsumer(Consumer):
"""
A consumer implementation that consumes partitions for a topic in
parallel using multiple processes
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
num_procs: Number of processes to start for consuming messages.
The available partitions will be divided among these processes
partitions_per_proc: Number of partitions to be allocated per process
(overrides num_procs)
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic,
partitions=None,
auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1,
partitions_per_proc=0,
**simple_consumer_options):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master
manager = MPManager()
self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
start = manager.Event(), # Indicates the consumers to start fetch
exit = manager.Event(), # Requests the consumers to shutdown
pause = manager.Event()) # Requests the consumers to pause fetch
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
# dict.keys() returns a view in py3 + it's not a thread-safe operation
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
# It's safer to copy dict as it only runs during the init.
partitions = list(self.offsets.copy().keys())
# By default, start one consumer process for all partitions
# The logic below ensures that
# * we do not cross the num_procs limit
# * we have an even distribution of partitions among processes
if partitions_per_proc:
num_procs = len(partitions) / partitions_per_proc
if num_procs * partitions_per_proc < len(partitions):
num_procs += 1
# The final set of chunks
chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
self.procs = []
for chunk in chunks:
options = {'partitions': list(chunk)}
if simple_consumer_options:
simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options)
args = (client.copy(), self.group, self.topic, self.queue,
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)
def __repr__(self):
return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
(self.group, self.topic, len(self.procs))
def stop(self):
# Set exit and start off all waiting consumers
self.events.exit.set()
self.events.pause.set()
self.events.start.set()
for proc in self.procs:
proc.join()
proc.terminate()
super(MultiProcessConsumer, self).stop()
def __iter__(self):
"""
Iterator to consume the messages available on this consumer
"""
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
self.events.pause.set()
while True:
self.events.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
# TODO: This is a hack and will make the consumer block for
# at least one second. Need to find a better way of doing this
partition, message = self.queue.get(block=True, timeout=1)
except queue.Empty:
break
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
yield message
self.events.start.clear()
def get_messages(self, count=1, block=True, timeout=10):
"""
Fetch the specified number of messages
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that
many messages are fetched.
timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
"""
messages = []
# Give a size hint to the consumers. Each consumer process will fetch
# a maximum of "count" messages. This will fetch more messages than
# necessary, but these will not be committed to kafka. Also, the extra
# messages can be provided in subsequent runs
self.size.value = count
self.events.pause.clear()
if timeout is not None:
max_time = time.time() + timeout
new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
# messages when the user might need only a few
if self.queue.empty():
self.events.start.set()
block_next_call = block is True or block > len(messages)
try:
partition, message = self.queue.get(block_next_call,
timeout)
except queue.Empty:
break
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
count -= 1
if timeout is not None:
timeout = max_time - time.time()
self.size.value = 0
self.events.start.clear()
self.events.pause.set()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages

View File

@ -0,0 +1,444 @@
from __future__ import absolute_import
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError:
from itertools import izip_longest as izip_longest, repeat # python 2
import logging
try:
import queue # python 3
except ImportError:
import Queue as queue # python 2
import sys
import time
import six
from .base import (
Consumer,
FETCH_DEFAULT_BLOCK_TIMEOUT,
AUTO_COMMIT_MSG_COUNT,
AUTO_COMMIT_INTERVAL,
FETCH_MIN_BYTES,
FETCH_BUFFER_SIZE_BYTES,
MAX_FETCH_BUFFER_SIZE_BYTES,
FETCH_MAX_WAIT_TIME,
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
log = logging.getLogger(__name__)
class FetchContext(object):
"""
Class for managing the state of a consumer during fetch
"""
def __init__(self, consumer, block, timeout):
self.consumer = consumer
self.block = block
if block:
if not timeout:
timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
else:
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
"""Reset values"""
self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class SimpleConsumer(Consumer):
"""
A simple consumer implementation that consumes all/specified partitions
for a topic
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
fetch_size_bytes: number of bytes to request in a FetchRequest
buffer_size: default 4K. Initial number of bytes to tell kafka we
have available. This will double as needed.
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
available. None means no limit.
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no
timeout, so it will wait forever.
auto_offset_reset: default largest. Reset partition offsets upon
OffsetOutOfRangeError. Valid values are largest and smallest.
Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None,
auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError('buffer_size (%d) is greater than '
'max_buffer_size (%d)' %
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.auto_offset_reset = auto_offset_reset
self.queue = queue.Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
"""Update offsets using auto_offset_reset policy (smallest|largest)
Arguments:
partition (int): the partition for which offsets should be updated
Returns: Updated offset on success, None on failure
"""
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
# send_offset_request
log.info('Resetting topic-partition offset to %s for %s:%d',
self.auto_offset_reset, self.topic, partition)
try:
(resp, ) = self.client.send_offset_request(reqs)
except KafkaError as e:
log.error('%s sending offset request for %s:%d',
e.__class__.__name__, self.topic, partition)
else:
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]
def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
Arguments:
offset: how much to modify the offset
whence: where to modify it from, default is None
* None is an absolute offset
* 0 is relative to the earliest available offset (head)
* 1 is relative to the current offset
* 2 is relative to the latest known offset (tail)
partition: modify which partition, default is None.
If partition is None, would modify all partitions.
"""
if whence is None: # set an absolute offset
if partition is None:
for tmp_partition in self.offsets:
self.offsets[tmp_partition] = offset
else:
self.offsets[partition] = offset
elif whence == 1: # relative to current position
if partition is None:
for tmp_partition, _offset in self.offsets.items():
self.offsets[tmp_partition] = _offset + offset
else:
self.offsets[partition] += offset
elif whence in (0, 2): # relative to beginning or end
reqs = []
deltas = {}
if partition is None:
# divide the request offset by number of partitions,
# distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
for tmp_partition, r in izip_longest(self.offsets.keys(),
repeat(1, rem),
fillvalue=0):
deltas[tmp_partition] = delta + r
for tmp_partition in self.offsets.keys():
if whence == 0:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-2,
1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-1,
1))
else:
pass
else:
deltas[partition] = offset
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
else:
pass
resps = self.client.send_offset_request(reqs)
for resp in resps:
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError('Unexpected value for `whence`, %d' % whence)
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
self.count_since_commit += 1
if self.auto_commit:
self.commit()
self.queue = queue.Queue()
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that
many messages are fetched.
timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
"""
messages = []
if timeout is not None:
timeout += time.time()
new_offsets = {}
log.debug('getting %d messages', count)
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
block_next_call = block is True or block > len(messages)
result = self._get_message(block_next_call, block_time,
get_partition_info=True,
update_offset=False)
log.debug('got %s from _get_messages', result)
if not result:
if block_next_call and (timeout is None or time.time() <= timeout):
continue
break
partition, message = result
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
log.debug('got %d messages: %s', len(messages), messages)
return messages
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
return self._get_message(block, timeout, get_partition_info)
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
start_at = time.time()
while self.queue.empty():
# We're out of messages, go grab some more.
log.debug('internal queue empty, fetching more messages')
with FetchContext(self, block, timeout):
self._fetch()
if not block or time.time() > (start_at + timeout):
break
try:
partition, message = self.queue.get_nowait()
if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
else:
return message
except queue.Empty:
log.debug('internal queue empty after fetch - returning None')
return None
def __iter__(self):
if self.iter_timeout is None:
timeout = ITER_TIMEOUT_SECONDS
else:
timeout = self.iter_timeout
while True:
message = self.get_message(True, timeout)
if message:
yield message
elif self.iter_timeout is None:
# We did not receive any message yet but we don't have a
# timeout, so give up the CPU for a while before trying again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
# Timed out waiting for a message
break
def _fetch(self):
# Create fetch request payloads for all the partitions
partitions = dict((p, self.buffer_size)
for p in self.fetch_offsets.keys())
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes,
fail_on_error=False
)
retry_partitions = {}
for resp in responses:
try:
check_error(resp)
except UnknownTopicOrPartitionError:
log.error('UnknownTopicOrPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
raise
except NotLeaderForPartitionError:
log.error('NotLeaderForPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
continue
except OffsetOutOfRangeError:
log.warning('OffsetOutOfRangeError for %s:%d. '
'Resetting partition offset...',
resp.topic, resp.partition)
self.reset_partition_offset(resp.partition)
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError as e:
log.warning('FailedPayloadsError for %s:%d',
e.payload.topic, e.payload.partition)
# Retry this partition
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue
partition = resp.partition
buffer_size = partitions[partition]
try:
for message in resp.messages:
if message.offset < self.fetch_offsets[partition]:
log.debug('Skipping message %s because its offset is less than the consumer offset',
message)
continue
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
buffer_size == self.max_buffer_size):
log.error('Max fetch size %d too small',
self.max_buffer_size)
raise
if self.max_buffer_size is None:
buffer_size *= 2
else:
buffer_size = min(buffer_size * 2,
self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
except ConsumerNoMoreData as e:
log.debug('Iteration was ended by %r', e)
except StopIteration:
# Stop iterating through this partition
log.debug('Done iterating over partition %s', partition)
partitions = retry_partitions

View File

@ -0,0 +1,175 @@
"""
Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
from monasca_common.kafka_lib.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
class OffsetCommitContext(object):
"""
Provides commit/rollback semantics around a `SimpleConsumer`.
Usage assumes that `auto_commit` is disabled, that messages are consumed in
batches, and that the consuming process will record its own successful
processing of each message. Both the commit and rollback operations respect
a "high-water mark" to ensure that last unsuccessfully processed message
will be retried.
Example:
.. code:: python
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()
while some_condition:
with OffsetCommitContext(consumer) as context:
messages = consumer.get_messages(count, block=False)
for partition, message in messages:
if can_process(message):
context.mark(partition, message.offset)
else:
break
if not context:
sleep(delay)
These semantics allow for deferred message processing (e.g. if `can_process`
compares message time to clock time) and for repeated processing of the last
unsuccessful message (until some external error is resolved).
"""
def __init__(self, consumer):
"""
:param consumer: an instance of `SimpleConsumer`
"""
self.consumer = consumer
self.initial_offsets = None
self.high_water_mark = None
self.logger = getLogger("kafka.context")
def mark(self, partition, offset):
"""
Set the high-water mark in the current context.
In order to know the current partition, it is helpful to initialize
the consumer to provide partition info via:
.. code:: python
consumer.provide_partition_info()
"""
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
self.logger.debug("Setting high-water mark to: %s",
{partition: max_offset})
self.high_water_mark[partition] = max_offset
def __nonzero__(self):
"""
Return whether any operations were marked in the context.
"""
return bool(self.high_water_mark)
def __enter__(self):
"""
Start a new context:
- Record the initial offsets for rollback
- Reset the high-water mark
"""
self.initial_offsets = dict(self.consumer.offsets)
self.high_water_mark = dict()
self.logger.debug("Starting context at: %s", self.initial_offsets)
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
End a context.
- If there was no exception, commit up to the current high-water mark.
- If there was an offset of range error, attempt to find the correct
initial offset.
- If there was any other error, roll back to the initial offsets.
"""
if exc_type is None:
self.commit()
elif isinstance(exc_value, OffsetOutOfRangeError):
self.handle_out_of_range()
return True
else:
self.rollback()
def commit(self):
"""
Commit this context's offsets:
- If the high-water mark has moved, commit up to and position the
consumer at the high-water mark.
- Otherwise, reset to the consumer to the initial offsets.
"""
if self.high_water_mark:
self.logger.info("Committing offsets: %s", self.high_water_mark)
self.commit_partition_offsets(self.high_water_mark)
self.update_consumer_offsets(self.high_water_mark)
else:
self.update_consumer_offsets(self.initial_offsets)
def rollback(self):
"""
Rollback this context:
- Position the consumer at the initial offsets.
"""
self.logger.info("Rolling back context: %s", self.initial_offsets)
self.update_consumer_offsets(self.initial_offsets)
def commit_partition_offsets(self, partition_offsets):
"""
Commit explicit partition/offset pairs.
"""
self.logger.debug("Committing partition offsets: %s", partition_offsets)
commit_requests = [
OffsetCommitRequest(self.consumer.topic, partition, offset, None)
for partition, offset in partition_offsets.items()
]
commit_responses = self.consumer.client.send_offset_commit_request(
self.consumer.group,
commit_requests,
)
for commit_response in commit_responses:
check_error(commit_response)
def update_consumer_offsets(self, partition_offsets):
"""
Update consumer offsets to explicit positions.
"""
self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
for partition, offset in partition_offsets.items():
self.consumer.offsets[partition] = offset
# consumer keeps other offset states beyond its `offsets` dictionary,
# a relative seek with zero delta forces the consumer to reset to the
# current value of the `offsets` dictionary
self.consumer.seek(0, 1)
def handle_out_of_range(self):
"""
Handle out of range condition by seeking to the beginning of valid
ranges.
This assumes that an out of range doesn't happen by seeking past the end
of valid ranges -- which is far less likely.
"""
self.logger.info("Seeking beginning of partition on out of range error")
self.consumer.seek(0, 0)

View File

@ -0,0 +1,7 @@
from .roundrobin import RoundRobinPartitioner
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
__all__ = [
'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
'LegacyPartitioner'
]

View File

@ -0,0 +1,24 @@
class Partitioner(object):
"""
Base class for a partitioner
"""
def __init__(self, partitions):
"""
Initialize the partitioner
Arguments:
partitions: A list of available partitions (during startup)
"""
self.partitions = partitions
def partition(self, key, partitions=None):
"""
Takes a string key and num_partitions as argument and returns
a partition to be used for the message
Arguments:
key: the key to use for partitioning
partitions: (optional) a list of partitions.
"""
raise NotImplementedError('partition function has to be implemented')

View File

@ -0,0 +1,110 @@
import six
from .base import Partitioner
class Murmur2Partitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
the hash of the key. Attempts to apply the same hashing
function as mainline java client.
"""
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
idx = (murmur2(key) & 0x7fffffff) % len(partitions)
return partitions[idx]
class LegacyPartitioner(Partitioner):
"""DEPRECATED -- See Issue 374
Implements a partitioner which selects the target partition based on
the hash of the key
"""
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
size = len(partitions)
idx = hash(key) % size
return partitions[idx]
# Default will change to Murmur2 in 0.10 release
HashedPartitioner = LegacyPartitioner
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
def murmur2(key):
"""Pure-python Murmur2 implementation.
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
key: if not a bytes type, encoded using default encoding
Returns: MurmurHash2 of key bytearray
"""
# Convert key to bytes or bytearray
if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
data = key
else:
data = bytearray(str(key).encode())
length = len(data)
seed = 0x9747b28c
# 'm' and 'r' are mixing constants generated offline.
# They're not really 'magic', they just happen to work well.
m = 0x5bd1e995
r = 24
# Initialize the hash to a random value
h = seed ^ length
length4 = length // 4
for i in range(length4):
i4 = i * 4
k = ((data[i4 + 0] & 0xff) +
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24))
k &= 0xffffffff
k *= m
k &= 0xffffffff
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k &= 0xffffffff
k *= m
k &= 0xffffffff
h *= m
h &= 0xffffffff
h ^= k
h &= 0xffffffff
# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff
if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff
if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 13 # h >>> 13;
h &= 0xffffffff
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h &= 0xffffffff
return h

View File

@ -0,0 +1,23 @@
from itertools import cycle
from .base import Partitioner
class RoundRobinPartitioner(Partitioner):
"""
Implements a round robin partitioner which sends data to partitions
in a round robin fashion
"""
def __init__(self, partitions):
super(RoundRobinPartitioner, self).__init__(partitions)
self.iterpart = cycle(partitions)
def _set_partitions(self, partitions):
self.partitions = partitions
self.iterpart = cycle(partitions)
def partition(self, key, partitions=None):
# Refresh the partition list if necessary
if partitions and self.partitions != partitions:
self._set_partitions(partitions)
return next(self.iterpart)

View File

@ -0,0 +1,6 @@
from .simple import SimpleProducer
from .keyed import KeyedProducer
__all__ = [
'SimpleProducer', 'KeyedProducer'
]

View File

@ -0,0 +1,462 @@
from __future__ import absolute_import
import atexit
import logging
import time
try:
from queue import Empty, Full, Queue
except ImportError:
from Queue import Empty, Full, Queue
from collections import defaultdict
from threading import Thread, Event
import six
from monasca_common.kafka_lib.common import (
ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
)
from monasca_common.kafka_lib.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from monasca_common.kafka_lib.util import kafka_bytestring
log = logging.getLogger('kafka.producer')
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
# unlimited
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
# unlimited retries by default
ASYNC_RETRY_LIMIT = None
ASYNC_RETRY_BACKOFF_MS = 100
ASYNC_RETRY_ON_TIMEOUTS = True
ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
ASYNC_STOP_TIMEOUT_SECS = 30
SYNC_FAIL_ON_ERROR_DEFAULT = True
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
codec_compresslevel=None):
"""Private method to manage producing messages asynchronously
Listens on the queue for a specified number of messages or until
a specified timeout and then sends messages to the brokers in grouped
requests (one per broker).
Messages placed on the queue should be tuples that conform to this format:
((topic, partition), message, key)
Currently does not mark messages with task_done. Do not attempt to join()!
Arguments:
queue (threading.Queue): the queue from which to get messages
client (KafkaClient): instance to use for communicating with brokers
codec (kafka.protocol.ALL_CODECS): compression codec to use
batch_time (int): interval in seconds to send message batches
batch_size (int): count of messages that will trigger an immediate send
req_acks: required acks to use with ProduceRequests. see server protocol
ack_timeout: timeout to wait for required acks. see server protocol
retry_options (RetryOptions): settings for retry limits, backoff etc
stop_event (threading.Event): event to monitor for shutdown signal.
when this event is 'set', the producer will stop sending messages.
log_messages_on_error (bool, optional): log stringified message-contents
on any produce error, otherwise only log a hash() of the contents,
defaults to True.
stop_timeout (int or float, optional): number of seconds to continue
retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
while not stop_event.is_set():
try:
client.reinit()
except Exception as e:
log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
else:
break
stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
# Handle stop_timeout
if stop_event.is_set():
if not stop_at:
stop_at = stop_timeout + time.time()
if time.time() > stop_at:
log.debug('Async producer stopping due to stop_timeout')
break
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
msgset = defaultdict(list)
# Merging messages will require a bit more work to manage correctly
# for now, dont look for new batches if we have old ones to retry
if request_tries:
count = 0
log.debug('Skipping new batch collection to handle retries')
else:
log.debug('Batching size: %s, timeout: %s', count, timeout)
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
except Empty:
break
# Check if the controller has requested us to stop
if topic_partition == STOP_ASYNC_PRODUCER:
stop_event.set()
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
msgset[topic_partition].append((msg, key))
# Send collected requests upstream
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key, codec_compresslevel)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
request_tries[req] = 0
if not request_tries:
continue
reqs_to_retry, error_cls = [], None
retry_state = {
'do_backoff': False,
'do_refresh': False
}
def _handle_error(error_cls, request):
if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
reqs_to_retry.append(request)
if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
retry_state['do_backoff'] |= True
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
requests = list(request_tries.keys())
log.debug('Sending: %s', requests)
responses = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
log.debug('Received: %s', responses)
for i, response in enumerate(responses):
error_cls = None
if isinstance(response, FailedPayloadsError):
error_cls = response.__class__
orig_req = response.payload
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)
log.error('%s sending ProduceRequest (#%d of %d) '
'to %s:%d with msgs %s',
error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
if not reqs_to_retry:
request_tries = {}
continue
# doing backoff before next retry
if retry_state['do_backoff'] and retry_options.backoff_ms:
log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
# refresh topic metadata before next retry
if retry_state['do_refresh']:
log.warn('Async producer forcing metadata refresh metadata before retrying')
try:
client.load_metadata_for_topics()
except Exception as e:
log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message)
# Apply retry limit, dropping messages that are over
request_tries = dict(
(key, count + 1)
for (key, count) in request_tries.items()
if key in reqs_to_retry
and (retry_options.limit is None
or (count < retry_options.limit))
)
# Log messages we are going to retry
for orig_req in request_tries.keys():
log.info('Retrying ProduceRequest to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
if request_tries or not queue.empty():
log.error('Stopped producer with {0} unsent messages'
.format(len(request_tries) + queue.qsize()))
class Producer(object):
"""
Base class to be used by producers
Arguments:
client (KafkaClient): instance to use for broker communications.
If async=True, the background thread will use client.copy(),
which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
defaults to 1 (local ack).
ack_timeout (int, optional): millisecond timeout to wait for the
configured req_acks, defaults to 1000.
sync_fail_on_error (bool, optional): whether sync producer should
raise exceptions (True), or just return errors (False),
defaults to True.
async (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
sent in batches of this size, defaults to 20.
batch_send_every_t (int or float, optional): If async is True,
messages are sent immediately after this timeout in seconds, even
if there are fewer than batch_send_every_n, defaults to 20.
async_retry_limit (int, optional): number of retries for failed messages
or None for unlimited, defaults to None / unlimited.
async_retry_backoff_ms (int, optional): milliseconds to backoff on
failed messages, defaults to 100.
async_retry_on_timeouts (bool, optional): whether to retry on
RequestTimeoutError, defaults to True.
async_queue_maxsize (int, optional): limit to the size of the
internal message queue in number of messages (not size), defaults
to 0 (no limit).
async_queue_put_timeout (int or float, optional): timeout seconds
for queue.put in send_messages for async producers -- will only
apply if async_queue_maxsize > 0 and the queue is Full,
defaults to 0 (fail immediately on full queue).
async_log_messages_on_error (bool, optional): set to False and the
async producer will only log hash() contents on failed produce
requests, defaults to True (log full messages). Hash logging
will not allow you to identify the specific message that failed,
but it will allow you to match failures with retries.
async_stop_timeout (int or float, optional): seconds to continue
attempting to send queued messages after producer.stop(),
defaults to 30.
Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
thread in batches, defaults to False. Deprecated, use 'async'
"""
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
DEFAULT_ACK_TIMEOUT = 1000
def __init__(self, client,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
if async:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
self.client = client
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
self.stopped = False
if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
self.codec = codec
self.codec_compresslevel = codec_compresslevel
if self.async:
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
async_retry_options = RetryOptions(
limit=async_retry_limit,
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(
target=_send_upstream,
args=(self.queue, self.client.copy(), self.codec,
batch_send_every_t, batch_send_every_n,
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
kwargs={'log_messages_on_error': async_log_messages_on_error,
'stop_timeout': async_stop_timeout,
'codec_compresslevel': self.codec_compresslevel}
)
# Thread will die if main thread exits
self.thread.daemon = True
self.thread.start()
def cleanup(obj):
if not obj.stopped:
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
else:
self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
@param: topic, name of topic for produce request -- type str
@param: partition, partition number for produce request -- type int
@param: *msg, one or more message payloads -- type bytes
@returns: ResponseRequest returned by server
raises on error
Note that msg type *must* be encoded to bytes by user.
Passing unicode message will not work, for example
you should encode before calling send_messages via
something like `unicode_message.encode('utf-8')`
All messages produced via this method will set the message 'key' to Null
"""
topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs):
key = kwargs.pop('key', None)
# Guarantee that msg is actually a list or tuple (should always be true)
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
for m in msg:
# The protocol allows to have key & payload with null values both,
# (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
if m is None:
if key is None:
raise TypeError("key and payload can't be null in one")
# Raise TypeError if any non-null message is not encoded as bytes
elif not isinstance(m, six.binary_type):
raise TypeError("all produce message payloads must be null or type bytes")
# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
raise TypeError("the topic must be type bytes")
# Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")
if self.async:
for idx, m in enumerate(msg):
try:
item = (TopicAndPartition(topic, partition), m, key)
if self.async_queue_put_timeout == 0:
self.queue.put_nowait(item)
else:
self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request(
[req], acks=self.req_acks, timeout=self.ack_timeout,
fail_on_error=self.sync_fail_on_error
)
except Exception:
log.exception("Unable to send messages")
raise
return resp
def stop(self, timeout=None):
"""
Stop the producer (async mode). Blocks until async thread completes.
"""
if timeout is not None:
log.warning('timeout argument to stop() is deprecated - '
'it will be removed in future release')
if not self.async:
log.warning('producer.stop() called, but producer is not async')
return
if self.stopped:
log.warning('producer.stop() called, but producer is already stopped')
return
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread_stop_event.set()
self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped
# py3 supports unregistering
if hasattr(atexit, 'unregister'):
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:
# ValueError on list.remove() if the exithandler no longer exists
# but that is fine here
try:
atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
except ValueError:
pass
del self._cleanup_func
self.stopped = True
def __del__(self):
if not self.stopped:
self.stop()

View File

@ -0,0 +1,51 @@
from __future__ import absolute_import
import logging
import warnings
from .base import Producer
from ..partitioner import HashedPartitioner
from ..util import kafka_bytestring
log = logging.getLogger(__name__)
class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
See Producer class for Arguments
Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner.
Defaults to HashedPartitioner.
"""
def __init__(self, *args, **kwargs):
self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
self.partitioners = {}
super(KeyedProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
partitioner = self.partitioners[topic]
return partitioner.partition(key)
def send_messages(self, topic, key, *msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg, key=key)
# DEPRECATED
def send(self, topic, key, msg):
warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
return self.send_messages(topic, key, msg)
def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async

View File

@ -0,0 +1,58 @@
from __future__ import absolute_import
from itertools import cycle
import logging
import random
import six
from six.moves import xrange
from .base import Producer
log = logging.getLogger(__name__)
class SimpleProducer(Producer):
"""A simple, round-robin producer.
See Producer class for Base Arguments
Additional Arguments:
random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise
if false, the first message block will always publish
to partition 0 before cycling through each partition,
defaults to True.
"""
def __init__(self, *args, **kwargs):
self.partition_cycles = {}
self.random_start = kwargs.pop('random_start', True)
super(SimpleProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic):
if topic not in self.partition_cycles:
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
# Randomize the initial partition that is returned
if self.random_start:
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
next(self.partition_cycles[topic])
return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
if not isinstance(topic, six.binary_type):
topic = topic.encode('utf-8')
partition = self._next_partition(topic)
return super(SimpleProducer, self).send_messages(
topic, partition, *msg
)
def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async

View File

@ -0,0 +1,646 @@
import logging
import struct
import six
from six.moves import xrange
from monasca_common.kafka_lib.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from monasca_common.kafka_lib.common import (
Message, OffsetAndMessage, TopicAndPartition,
BrokerMetadata, TopicMetadata, PartitionMetadata,
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
ConsumerFetchSizeTooSmall, UnsupportedCodecError,
ConsumerMetadataResponse
)
from monasca_common.kafka_lib.util import (
crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
log = logging.getLogger(__name__)
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
class KafkaProtocol(object):
"""
Class to encapsulate all of the protocol encoding/decoding.
This class does not have any state associated with it, it is purely
for organization.
"""
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
CONSUMER_METADATA_KEY = 10
###################
# Private API #
###################
@classmethod
def _encode_message_header(cls, client_id, correlation_id, request_key,
version=0):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
version, # ApiVersion
correlation_id, # CorrelationId
len(client_id), # ClientId size
client_id) # ClientId
@classmethod
def _encode_message_set(cls, messages):
"""
Encode a MessageSet. Unlike other arrays in the protocol,
MessageSets are not length-prefixed
Format
======
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
"""
message_set = []
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
len(encoded_message),
encoded_message))
return b''.join(message_set)
@classmethod
def _encode_message(cls, message):
"""
Encode a single message.
The magic number of a message is a format version number.
The only supported magic number right now is zero
Format
======
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
"""
if message.magic == 0:
msg = b''.join([
struct.pack('>BB', message.magic, message.attributes),
write_int_string(message.key),
write_int_string(message.value)
])
crc = crc32(msg)
msg = struct.pack('>I%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@classmethod
def _decode_message_set_iter(cls, data):
"""
Iteratively decode a MessageSet
Reads repeated elements of (offset, message), calling decode_message
to decode a single message. Since compressed messages contain futher
MessageSets, these two methods have been decoupled so that they may
recurse easily.
"""
cur = 0
read_message = False
while cur < len(data):
try:
((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur)
for (offset, message) in KafkaProtocol._decode_message(msg, offset):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
# NOTE: Not sure this is correct error handling:
# Is it possible to get a BUE if the message set is somewhere
# in the middle of the fetch response? If so, we probably have
# an issue that's not fetch size too small.
# Aren't we ignoring errors if we fail to unpack data by
# raising StopIteration()?
# If _decode_message() raises a ChecksumError, couldn't that
# also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
# yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@classmethod
def _decode_message(cls, data, offset):
"""
Decode a single Message
The only caller of this method is decode_message_set_iter.
They are decoupled to support nested messages (compressed MessageSets).
The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)
codec = att & ATTRIBUTE_CODEC_MASK
if codec == CODEC_NONE:
yield (offset, Message(magic, att, key, value))
elif codec == CODEC_GZIP:
gz = gzip_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, msg)
elif codec == CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, msg)
##################
# Public API #
##################
@classmethod
def encode_produce_request(cls, client_id, correlation_id,
payloads=None, acks=1, timeout=1000):
"""
Encode some ProduceRequest structs
Arguments:
client_id: string
correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
1: written to disk by the leader
2+: waits for this many number of replicas to sync
-1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas.
This is _not_ a socket timeout
"""
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.PRODUCE_KEY))
message.append(struct.pack('>hii', acks, timeout,
len(grouped_payloads)))
for topic, topic_payloads in grouped_payloads.items():
message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic,
len(topic_payloads)))
for partition, payload in topic_payloads.items():
msg_set = KafkaProtocol._encode_message_set(payload.messages)
message.append(struct.pack('>ii%ds' % len(msg_set), partition,
len(msg_set), msg_set))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
def decode_produce_response(cls, data):
"""
Decode bytes to a ProduceResponse
Arguments:
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for _ in range(num_topics):
((strlen,), cur) = relative_unpack('>h', data, cur)
topic = data[cur:cur + strlen]
cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _ in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq',
data, cur)
yield ProduceResponse(topic, partition, error, offset)
@classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
max_wait_time=100, min_bytes=4096):
"""
Encodes some FetchRequest structs
Arguments:
client_id: string
correlation_id: int
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before
returning the response
"""
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.FETCH_KEY))
# -1 is the replica id
message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes,
len(grouped_payloads)))
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>iqi', partition, payload.offset,
payload.max_bytes))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
def decode_fetch_response(cls, data):
"""
Decode bytes to a FetchResponse
Arguments:
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for j in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \
relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(
topic, partition, error,
highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None):
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_KEY))
# -1 is the replica id
message.append(struct.pack('>ii', -1, len(grouped_payloads)))
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>iqi', partition, payload.time,
payload.max_offsets))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
def decode_offset_response(cls, data):
"""
Decode bytes to an OffsetResponse
Arguments:
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _ in range(num_partitions):
((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur)
offsets = []
for k in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset)
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=None,
payloads=None):
"""
Encode a MetadataRequest
Arguments:
client_id: string
correlation_id: int
topics: list of strings
"""
if payloads is None:
topics = [] if topics is None else topics
else:
topics = payloads
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY))
message.append(struct.pack('>i', len(topics)))
for topic in topics:
message.append(struct.pack('>h%ds' % len(topic), len(topic), topic))
msg = b''.join(message)
return write_int_string(msg)
@classmethod
def decode_metadata_response(cls, data):
"""
Decode bytes to a MetadataResponse
Arguments:
data: bytes to decode
"""
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
brokers = []
for _ in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
topic_metadata = []
for _ in range(num_topics):
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = []
for _ in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack(
'>%di' % numReplicas, data, cur)
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata.append(
PartitionMetadata(topic_name, partition, leader,
replicas, isr, partition_error_code)
)
topic_metadata.append(
TopicMetadata(topic_name, topic_error, partition_metadata)
)
return MetadataResponse(brokers, topic_metadata)
@classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
"""
Encode a ConsumerMetadataRequest
Arguments:
client_id: string
correlation_id: int
payloads: string (consumer group)
"""
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.CONSUMER_METADATA_KEY))
message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
msg = b''.join(message)
return write_int_string(msg)
@classmethod
def decode_consumer_metadata_response(cls, data):
"""
Decode bytes to a ConsumerMetadataResponse
Arguments:
data: bytes to decode
"""
((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
return ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
"""
Encode some OffsetCommitRequest structs
Arguments:
client_id: string
correlation_id: int
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_COMMIT_KEY))
message.append(write_short_string(group))
message.append(struct.pack('>i', len(grouped_payloads)))
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>iq', partition, payload.offset))
message.append(write_short_string(payload.metadata))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
def decode_offset_commit_response(cls, data):
"""
Decode bytes to an OffsetCommitResponse
Arguments:
data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for _ in xrange(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _ in xrange(num_partitions):
((partition, error), cur) = relative_unpack('>ih', data, cur)
yield OffsetCommitResponse(topic, partition, error)
@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id,
group, payloads, from_kafka=False):
"""
Encode some OffsetFetchRequest structs. The request is encoded using
version 0 if from_kafka is false, indicating a request for Zookeeper
offsets. It is encoded using version 1 otherwise, indicating a request
for Kafka offsets.
Arguments:
client_id: string
correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
reqver = 1 if from_kafka else 0
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_FETCH_KEY,
version=reqver))
message.append(write_short_string(group))
message.append(struct.pack('>i', len(grouped_payloads)))
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>i', partition))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
def decode_offset_fetch_response(cls, data):
"""
Decode bytes to an OffsetFetchResponse
Arguments:
data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _ in range(num_partitions):
((partition, offset), cur) = relative_unpack('>iq', data, cur)
(metadata, cur) = read_short_string(data, cur)
((error,), cur) = relative_unpack('>h', data, cur)
yield OffsetFetchResponse(topic, partition, offset,
metadata, error)
def create_message(payload, key=None):
"""
Construct a Message
Arguments:
payload: bytes, the payload to send to Kafka
key: bytes, a key used for partition routing (optional)
"""
return Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
"""
Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, pl_key) for payload, pl_key in payloads])
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
return Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
"""
Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, pl_key) for payload, pl_key in payloads])
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
return Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
"""Create a message set using the given codec.
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
return a list containing a single codec-encoded message.
"""
if codec == CODEC_NONE:
return [create_message(m, k) for m, k in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages, key, compresslevel)]
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

View File

@ -0,0 +1,159 @@
import binascii
import collections
import struct
import sys
from threading import Thread, Event
import six
from monasca_common.kafka_lib.common import BufferUnderflowError
def crc32(data):
return binascii.crc32(data) & 0xffffffff
def write_int_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur):
if len(data) < cur + 2:
raise BufferUnderflowError("Not enough data left")
(strlen,) = struct.unpack('>h', data[cur:cur + 2])
if strlen == -1:
return None, cur + 2
cur += 2
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def read_int_string(data, cur):
if len(data) < cur + 4:
raise BufferUnderflowError(
"Not enough data left to read string len (%d < %d)" %
(len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1:
return None, cur + 4
cur += 4
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def relative_unpack(fmt, data, cur):
size = struct.calcsize(fmt)
if len(data) < cur + size:
raise BufferUnderflowError("Not enough data left")
out = struct.unpack(fmt, data[cur:cur + size])
return out, cur + size
def group_by_topic_and_partition(tuples):
out = collections.defaultdict(dict)
for t in tuples:
assert t.topic not in out or t.partition not in out[t.topic], \
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
t.topic, t.partition)
out[t.topic][t.partition] = t
return out
def kafka_bytestring(s):
"""
Takes a string or bytes instance
Returns bytes, encoding strings in utf-8 as necessary
"""
if isinstance(s, six.binary_type):
return s
if isinstance(s, six.string_types):
return s.encode('utf-8')
raise TypeError(s)
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
Arguments:
t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):
if t <= 0:
raise ValueError('Invalid timeout value')
if not callable(fn):
raise ValueError('fn must be callable')
self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None
def _timer(self, active):
# python2.6 Event.wait() always returns None
# python2.7 and greater returns the flag value (true/false)
# we want the flag value, so add an 'or' here for python2.6
# this is redundant for later python versions (FLAG OR FLAG == FLAG)
while not (active.wait(self.t) or active.is_set()):
self.fn(*self.args, **self.kwargs)
def start(self):
if self.thread is not None:
self.stop()
self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()
def stop(self):
if self.thread is None:
return
self.active.set()
self.thread.join(self.t + 1)
# noinspection PyAttributeOutsideInit
self.timer = None
self.fn = None
def __del__(self):
self.stop()

View File

@ -0,0 +1 @@
__version__ = '0.9.5'

View File

@ -27,8 +27,8 @@ FAKE_KAFKA_TOPIC = "topic"
class TestKafkaProducer(unittest.TestCase):
def setUp(self):
self.kafka_client_patcher = mock.patch('kafka.client')
self.kafka_producer_patcher = mock.patch('kafka.producer')
self.kafka_client_patcher = mock.patch('monasca_common.kafka.producer.kafka_client')
self.kafka_producer_patcher = mock.patch('monasca_common.kafka.producer.kafka_producer')
self.mock_kafka_client = self.kafka_client_patcher.start()
self.mock_kafka_producer = self.kafka_producer_patcher.start()
self.producer = self.mock_kafka_producer.KeyedProducer.return_value
@ -86,9 +86,9 @@ class TestKafkaProducer(unittest.TestCase):
class TestKafkaConsumer(unittest.TestCase):
def setUp(self):
self.kafka_client_patcher = mock.patch('kafka.client')
self.kafka_common_patcher = mock.patch('kafka.common')
self.kafka_consumer_patcher = mock.patch('kafka.consumer')
self.kafka_client_patcher = mock.patch('monasca_common.kafka.consumer.kafka_client')
self.kafka_common_patcher = mock.patch('monasca_common.kafka.consumer.kafka_common')
self.kafka_consumer_patcher = mock.patch('monasca_common.kafka.consumer.kafka_consumer')
self.kazoo_patcher = mock.patch(
'monasca_common.kafka.consumer.KazooClient')

View File

@ -4,7 +4,7 @@
iso8601>=0.1.11 # MIT
six>=1.9.0 # MIT
kazoo>=2.2 # Apache-2.0
kafka-python<1.0.0,>=0.9.5 # Apache-2.0
pykafka>=2.5.0 # Apache 2.0 License
PyMySQL>=0.7.6 # MIT License
oslo.config!=3.18.0,>=3.14.0 # Apache-2.0
pbr>=1.8 # Apache-2.0

View File

@ -33,3 +33,7 @@ max-line-length = 120
builtins = _
exclude=.venv,.git,.tox,dist,*egg,build
show-source = True
# note: Due to the need to fork kafka-python, many pep8 violations occure.
# All of the below ignores are caused by the forked kafka-python library
# so when monasca migrates to pykafka, the below line can be removed.
ignore = E121,E126,E127,E128,E131,E221,E226,E241,E251,E261,E302,E303,E501,E701,F401,H101,H102,H301,H304,H306,H404,H405