Updated pep8 checks

* set the maximum line length to 100
* removed unnecessary ignores
* updated codes for pep8
* added apache 2.0 license for missed files

Change-Id: If9398a8fdd094e50d6a07cb73f7389558dbe0a0d
Signed-off-by: Amir Mofakhar <amofakhar@op5.com>
This commit is contained in:
Amir Mofakhar 2018-04-04 13:07:20 +02:00
parent c453d0bb85
commit 806a418ede
29 changed files with 349 additions and 140 deletions

2
.gitignore vendored
View File

@ -19,3 +19,5 @@ dist
cover/
.coverage
.testrepository/
.venv
.stestr

View File

@ -1,8 +1,21 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# __title__ = 'kafka'
from .version import __version__
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0'
__version__ = __version__
# from monasca_common.kafka_lib.client import KafkaClient
# from monasca_common.kafka_lib.conn import KafkaConnection
@ -10,7 +23,8 @@ __copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0'
# create_message, create_gzip_message, create_snappy_message
# )
# from monasca_common.kafka_lib.producer import SimpleProducer, KeyedProducer
# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner,
# HashedPartitioner, Murmur2Partitioner
# from monasca_common.kafka_lib.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer
#
# __all__ = [

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import functools
@ -7,12 +19,13 @@ import time
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
from monasca_common.kafka_lib.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from monasca_common.kafka_lib.conn import (collect_hosts, KafkaConnection,
DEFAULT_SOCKET_TIMEOUT_SECONDS)
from monasca_common.kafka_lib.protocol import KafkaProtocol
from monasca_common.kafka_lib.util import kafka_bytestring
@ -44,7 +57,6 @@ class KafkaClient(object):
self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
##################
@ -121,7 +133,7 @@ class KafkaClient(object):
def _next_id(self):
"""Generate a new correlation id"""
# modulo to keep w/i int32
self.correlation_id = (self.correlation_id + 1) % 2**31
self.correlation_id = (self.correlation_id + 1) % 2 ** 31
return self.correlation_id
def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
@ -421,10 +433,7 @@ class KafkaClient(object):
def has_metadata_for_topic(self, topic):
topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
return topic in self.topic_partitions and len(self.topic_partitions[topic]) > 0
def get_partition_ids_for_topic(self, topic):
topic = kafka_bytestring(topic)
@ -437,7 +446,7 @@ class KafkaClient(object):
def topics(self):
return list(self.topic_partitions.keys())
def ensure_topic_exists(self, topic, timeout = 30):
def ensure_topic_exists(self, topic, timeout=30):
start_time = time.time()
while not self.has_metadata_for_topic(topic):
@ -536,7 +545,8 @@ class KafkaClient(object):
# this error code is provided for admin purposes only
# we never talk to replicas, only the leader
except ReplicaNotAvailableError:
log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
log.debug('Some (non-leader) replicas not available for topic %s partition %d',
topic, partition)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers:
@ -623,8 +633,8 @@ class KafkaClient(object):
"""
encoder = functools.partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)
max_wait_time=max_wait_time,
min_bytes=min_bytes)
resps = self._send_broker_aware_request(
payloads, encoder,
@ -645,8 +655,7 @@ class KafkaClient(object):
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
group=group)
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
@ -656,8 +665,7 @@ class KafkaClient(object):
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
@ -665,10 +673,10 @@ class KafkaClient(object):
if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_fetch_request_kafka(self, group, payloads=[],
fail_on_error=True, callback=None):
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group, from_kafka=True)
group=group, from_kafka=True)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gzip
from io import BytesIO
import struct
@ -81,7 +93,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
if xerial_compatible:
def _chunker():
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
yield payload[i:i + xerial_blocksize]
out = BytesIO()

View File

@ -1,87 +1,81 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import inspect
import sys
from collections import namedtuple
###############
# Structs #
###############
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
MetadataRequest = namedtuple("MetadataRequest", ["topics"])
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
["groups"])
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", ["groups"])
ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"])
["error", "nodeId", "host", "port"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequest = namedtuple("FetchRequest",
["topic", "partition", "offset", "max_bytes"])
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"])
FetchResponse = namedtuple("FetchResponse",
["topic", "partition", "error", "highwaterMark", "messages"])
["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequest = namedtuple("OffsetRequest",
["topic", "partition", "time", "max_offsets"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"])
OffsetResponse = namedtuple("OffsetResponse",
["topic", "partition", "error", "offsets"])
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
["topic", "partition", "offset", "metadata"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse",
["topic", "partition", "error"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest",
["topic", "partition"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset", "metadata", "error"])
["topic", "partition", "offset", "metadata", "error"])
# Other useful structs
BrokerMetadata = namedtuple("BrokerMetadata",
["nodeId", "host", "port"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
TopicMetadata = namedtuple("TopicMetadata",
["topic", "error", "partitions"])
TopicMetadata = namedtuple("TopicMetadata", ["topic", "error", "partitions"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMessage = namedtuple("OffsetAndMessage",
["offset", "message"])
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"])
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"])
#################
@ -240,7 +234,8 @@ class AsyncProducerQueueFull(KafkaError):
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
if inspect.isclass(obj) and issubclass(obj,
BrokerResponseError) and obj != BrokerResponseError:
yield obj

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
from random import shuffle

View File

@ -1,6 +1,6 @@
from .simple import SimpleConsumer
from .multiprocess import MultiProcessConsumer
from .kafka import KafkaConsumer
from .multiprocess import MultiProcessConsumer
from .simple import SimpleConsumer
__all__ = [
'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import atexit
@ -5,7 +17,6 @@ import logging
import numbers
from threading import Lock
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
UnknownTopicOrPartitionError, check_error, KafkaError
@ -31,6 +42,7 @@ FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
MAX_BACKOFF_SECONDS = 60
class Consumer(object):
"""
Base class to be used by other consumers. Not to be used directly
@ -148,7 +160,7 @@ class Consumer(object):
partitions = list(self.offsets.keys())
log.debug('Committing new offsets for %s, partitions %s',
self.topic, partitions)
self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
log.debug('Commit offset %d in SimpleConsumer: '
@ -189,7 +201,7 @@ class Consumer(object):
# py3 supports unregistering
if hasattr(atexit, 'unregister'):
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from collections import namedtuple
@ -51,6 +63,7 @@ DEPRECATED_CONFIG_KEYS = {
'metadata_broker_list': 'bootstrap_servers',
}
class KafkaConsumer(object):
"""A simpler kafka consumer"""
DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG)
@ -258,7 +271,8 @@ class KafkaConsumer(object):
# or (2) auto reset
else:
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
self._offsets.fetch[topic_partition] = \
self._reset_partition_offset(topic_partition)
# highwater marks (received from server on fetch response)
# and task_done (set locally by user)
@ -665,7 +679,7 @@ class KafkaConsumer(object):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
raise # pylint: disable-msg=E0704
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)
@ -682,7 +696,8 @@ class KafkaConsumer(object):
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
raise ConsumerTimeout(
'Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
#
# Autocommit private methods
@ -703,7 +718,8 @@ class KafkaConsumer(object):
self._uncommitted_message_count = 0
self._next_commit_time = None
if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
self._next_commit_time = time.time() + (
self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n

View File

@ -1,12 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
try:
import queue # python 3
import queue # python 3
except ImportError:
import Queue as queue # python 2
import Queue as queue # python 2
import time
from ..common import KafkaError
@ -72,7 +84,8 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except queue.Full:
if events.exit.is_set(): break
if events.exit.is_set():
break
count += 1
@ -93,9 +106,10 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
except KafkaError as e:
# Retry with exponential backoff
log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
log.error(
"Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
interval = interval * 2 if interval * 2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
class MultiProcessConsumer(Consumer):
@ -150,9 +164,9 @@ class MultiProcessConsumer(Consumer):
manager = MPManager()
self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
start = manager.Event(), # Indicates the consumers to start fetch
exit = manager.Event(), # Requests the consumers to shutdown
pause = manager.Event()) # Requests the consumers to pause fetch
start=manager.Event(), # Indicates the consumers to start fetch
exit=manager.Event(), # Requests the consumers to shutdown
pause=manager.Event()) # Requests the consumers to pause fetch
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
# dict.keys() returns a view in py3 + it's not a thread-safe operation

View File

@ -1,14 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError:
from itertools import izip_longest as izip_longest, repeat # python 2
from itertools import izip_longest as izip_longest, repeat # python 2
import logging
try:
import queue # python 3
import queue # python 3
except ImportError:
import Queue as queue # python 2
import Queue as queue # python 2
import sys
import time
@ -166,7 +178,7 @@ class SimpleConsumer(Consumer):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
raise # pylint: disable-msg=E0704
# send_offset_request
log.info('Resetting topic-partition offset to %s for %s:%d',
@ -198,7 +210,7 @@ class SimpleConsumer(Consumer):
If partition is None, would modify all partitions.
"""
if whence is None: # set an absolute offset
if whence is None: # set an absolute offset
if partition is None:
for tmp_partition in self.offsets:
self.offsets[tmp_partition] = offset
@ -308,6 +320,7 @@ class SimpleConsumer(Consumer):
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
@ -365,8 +378,7 @@ class SimpleConsumer(Consumer):
def _fetch(self):
# Create fetch request payloads for all the partitions
partitions = dict((p, self.buffer_size)
for p in self.fetch_offsets.keys())
partitions = dict((p, self.buffer_size) for p in self.fetch_offsets.keys())
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
@ -416,8 +428,9 @@ class SimpleConsumer(Consumer):
try:
for message in resp.messages:
if message.offset < self.fetch_offsets[partition]:
log.debug('Skipping message %s because its offset is less than the consumer offset',
message)
log.debug(
'Skipping message %s because its offset is less'
' than the consumer offset', message)
continue
# Put the message in our queue
self.queue.put((partition, message))

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Context manager to commit/rollback consumer offsets.
"""

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Partitioner(object):
"""

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from .base import Partitioner
@ -70,13 +82,13 @@ def murmur2(key):
for i in range(length4):
i4 = i * 4
k = ((data[i4 + 0] & 0xff) +
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24))
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24))
k &= 0xffffffff
k *= m
k &= 0xffffffff
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k &= 0xffffffff
k *= m
k &= 0xffffffff
@ -100,11 +112,11 @@ def murmur2(key):
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 13 # h >>> 13;
h ^= (h % 0x100000000) >> 13 # h >>> 13;
h &= 0xffffffff
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h &= 0xffffffff
return h

View File

@ -1,7 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import cycle
from .base import Partitioner
class RoundRobinPartitioner(Partitioner):
"""
Implements a round robin partitioner which sends data to partitions

View File

@ -1,5 +1,5 @@
from .simple import SimpleProducer
from .keyed import KeyedProducer
from .simple import SimpleProducer
__all__ = [
'SimpleProducer', 'KeyedProducer'

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import atexit
@ -83,7 +95,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
try:
client.reinit()
except Exception as e:
log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
log.warn(
'Async producer failed to connect to brokers; backoff for %s(ms) before retrying',
retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
else:
break
@ -148,7 +162,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
}
def _handle_error(error_cls, request):
if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
if (issubclass(error_cls, RETRY_ERROR_TYPES) or
(retry_options.retry_on_timeouts and
issubclass(error_cls, RequestTimedOutError))):
reqs_to_retry.append(request)
if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
retry_state['do_backoff'] |= True
@ -179,8 +195,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
'to %s:%d with msgs %s',
error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
orig_req.messages if log_messages_on_error else hash(orig_req.messages))
if not reqs_to_retry:
request_tries = {}
@ -203,17 +218,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
request_tries = dict(
(key, count + 1)
for (key, count) in request_tries.items()
if key in reqs_to_retry
and (retry_options.limit is None
or (count < retry_options.limit))
if key in reqs_to_retry and (retry_options.limit is None or
(count < retry_options.limit))
)
# Log messages we are going to retry
for orig_req in request_tries.keys():
log.info('Retrying ProduceRequest to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
orig_req.messages if log_messages_on_error else hash(orig_req.messages))
if request_tries or not queue.empty():
log.error('Stopped producer with {0} unsent messages'
@ -282,7 +295,7 @@ class Producer(object):
codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
@ -403,7 +416,8 @@ class Producer(object):
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
messages = create_message_set(
[(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request(
@ -441,7 +455,7 @@ class Producer(object):
# py3 supports unregistering
if hasattr(atexit, 'unregister'):
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:

View File

@ -1,12 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import logging
import warnings
from .base import Producer
from ..partitioner import HashedPartitioner
from ..util import kafka_bytestring
from .base import Producer
log = logging.getLogger(__name__)
@ -32,7 +43,8 @@ class KeyedProducer(Producer):
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
self.partitioners[topic] = self.partitioner_class(
self.client.get_partition_ids_for_topic(topic))
partitioner = self.partitioners[topic]
return partitioner.partition(key)
@ -44,7 +56,8 @@ class KeyedProducer(Producer):
# DEPRECATED
def send(self, topic, key, msg):
warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
warnings.warn("KeyedProducer.send is deprecated in favor of send_messages",
DeprecationWarning)
return self.send_messages(topic, key, msg)
def __repr__(self):

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from itertools import cycle
@ -40,7 +52,7 @@ class SimpleProducer(Producer):
# Randomize the initial partition that is returned
if self.random_start:
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
for _ in xrange(random.randint(0, num_partitions - 1)):
next(self.partition_cycles[topic])
return next(self.partition_cycles[topic])

View File

@ -1,15 +1,25 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import struct
import six
from six.moves import xrange
from monasca_common.kafka_lib.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from monasca_common.kafka_lib.common import (
Message, OffsetAndMessage, TopicAndPartition,
Message, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata,
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,

View File

@ -1,3 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import collections
import struct
@ -83,8 +95,7 @@ def group_by_topic_and_partition(tuples):
out = collections.defaultdict(dict)
for t in tuples:
assert t.topic not in out or t.partition not in out[t.topic], \
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
t.topic, t.partition)
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, t.topic, t.partition)
out[t.topic][t.partition] = t
return out

View File

@ -13,10 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import six
import sys
import time
import pyparsing
@ -56,8 +53,7 @@ dimension = dimension_name + dim_comparison_op + dimension_value
dimension.setParseAction(query_structures.Dimension)
dimension_list = pyparsing.Group((LBRACE + pyparsing.Optional(
pyparsing.delimitedList(dimension)) +
RBRACE))
pyparsing.delimitedList(dimension)) + RBRACE))
metric = (metric_name + pyparsing.Optional(dimension_list) |
pyparsing.Optional(metric_name) + dimension_list)

View File

@ -11,4 +11,5 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
PAGE_LIMIT = 50

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_config import cfg

View File

@ -57,7 +57,6 @@ class PolicyFileTestCase(base.BaseTestCase):
self.context, action, self.target)
class PolicyTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
@ -73,7 +72,7 @@ class PolicyTestCase(base.BaseTestCase):
os_policy.RuleDefault("example:uppercase_admin",
"role:ADMIN or role:sysadmin"),
os_policy.RuleDefault("example:get_http",
"http://www.example.com"),
"http://www.example.com"),
os_policy.RuleDefault("example:my_file",
"role:compute_admin or "
"project_id:%(project_id)s"),

View File

@ -18,7 +18,6 @@ import pyparsing
from monasca_common.monasca_query_language import aql_parser
from monasca_common.monasca_query_language import exceptions
from monasca_common.monasca_query_language import query_structures
class TestMonascaQueryLanguage(base.BaseTestCase):
@ -84,8 +83,8 @@ class TestMonascaQueryLanguage(base.BaseTestCase):
"source metric_one targets metric_two excluding metric_three group by hostname",
"source metric_one targets metric_two group by hostname",
"source metric_one group by hostname",
"source {__severity__=HIGH} targets {__severity__=LOW} excluding "
"{__alarmName__=alarm_one} group by __alarmName__"
"source {__severity__=HIGH} targets {__severity__=LOW} excluding"
" {__alarmName__=alarm_one} group by __alarmName__"
]
negative_expressions = [
"targets metric_two source_metric_one"

View File

@ -89,10 +89,12 @@ class TestSimport(base.BaseTestCase):
# 'full_path/monasca-common/monasca_common/tests/external/externalmodule.py'>
#
# while Python 3:
# <module 'external.externalmodule' from 'full_path/monasca-common/monasca_common/tests/external/externalmodule.py'>
# <module 'external.externalmodule' from
# 'full_path/monasca-common/monasca_common/tests/external/externalmodule.py'>
# , that's why we need to provide different module names for simport in Python 2 and 3
#
if six.PY2:
class TestSimportPY2(base.BaseTestCase):
@ -107,7 +109,8 @@ if six.PY2:
def test_good_load_external(self):
method = simport.load(PWD + "/external|monasca_common.tests.external.externalmodule:Blah.method_b")
method = simport.load(
PWD + "/external|monasca_common.tests.external.externalmodule:Blah.method_b")
self.assertTrue('monasca_common.tests.external.externalmodule' in sys.modules)
old = sys.modules['monasca_common.tests.external.externalmodule']
@ -119,7 +122,8 @@ if six.PY2:
self.assertEqual(method, external.externalmodule.Blah.method_b)
def test_import_class(self):
klass = simport.load(PWD + "/external|monasca_common.tests.external.externalmodule:Blah")
klass = simport.load(
PWD + "/external|monasca_common.tests.external.externalmodule:Blah")
import external.externalmodule
self.assertEqual(klass, external.externalmodule.Blah)

View File

@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslotest import base
import codecs
from oslotest import base
import six
from monasca_common.validation import metrics as metric_validator
@ -35,6 +35,7 @@ def _hex_to_unicode(hex_raw):
hex_str = hex_str_raw.decode('utf-8', 'replace')
return hex_str
# NOTE(trebskit) => http://www.cl.cam.ac.uk/~mgk25/ucs/examples/UTF-8-test.txt
UNICODE_MESSAGES = [
# 1 correct UTF-8 text
@ -207,7 +208,7 @@ class TestMetricValidation(base.BaseTestCase):
def test_invalid_dimension_key_length(self):
metric = {"name": "test_metric_name",
"dimensions": {'A'*256: 'B', 'B': 'C', 'D': 'E'},
"dimensions": {'A' * 256: 'B', 'B': 'C', 'D': 'E'},
"timestamp": 1405630174123,
"value": 5}
self.assertRaisesRegex(
@ -217,7 +218,7 @@ class TestMetricValidation(base.BaseTestCase):
def test_invalid_dimension_value_length(self):
metric = {"name": "test_metric_name",
"dimensions": {'A': 'B', 'B': 'C'*256, 'D': 'E'},
"dimensions": {'A': 'B', 'B': 'C' * 256, 'D': 'E'},
"timestamp": 1405630174123,
"value": 5}
self.assertRaisesRegex(
@ -324,7 +325,7 @@ class TestMetricValidation(base.BaseTestCase):
def test_invalid_dimension_value_chars(self):
for c in invalid_dimension_chars:
metric = {"name": "test_name",
"dimensions": {'test-key': 'test{}value'.format(c)},
"dimensions": {'test-key': 'test{}value'.format(c)},
"timestamp": 1405630174123,
"value": 5}
self.assertRaisesRegex(
@ -378,7 +379,8 @@ class TestMetricValidation(base.BaseTestCase):
def test_invalid_too_large_value_meta(self):
value_meta_value = ""
num_value_meta = 10
for i in six.moves.range(0, int(metric_validator.VALUE_META_VALUE_MAX_LENGTH / num_value_meta)):
for i in six.moves.range(
0, int(metric_validator.VALUE_META_VALUE_MAX_LENGTH / num_value_meta)):
value_meta_value = '{}{}'.format(value_meta_value, '1')
value_meta = {}
for i in six.moves.range(0, num_value_meta):

View File

@ -74,14 +74,14 @@ commands = bindep test
[flake8]
max-complexity = 50
max-line-length = 120
max-line-length = 100
builtins = _
exclude=.venv,.git,.tox,dist,*egg,build
show-source = True
# note: Due to the need to fork kafka-python, many pep8 violations occure.
# All of the below ignores are caused by the forked kafka-python library
# so when monasca migrates to pykafka, the below line can be removed.
ignore = E121,E126,E127,E128,E131,E221,E226,E241,E251,E261,E302,E303,E501,E701,F401,H101,H102,H301,H304,H306,H404,H405
ignore = H101,H301,H404,H405
[testenv:lower-constraints]
basepython = python3