monasca-common/monasca_common/kafka_lib/common.py

266 lines
7.2 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import inspect
import sys
###############
# Structs #
###############
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest", ["topics"])
MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", ["groups"])
ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"])
FetchResponse = namedtuple("FetchResponse",
["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"])
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset", "metadata", "error"])
# Other useful structs
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
TopicMetadata = namedtuple("TopicMetadata", ["topic", "error", "partitions"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"])
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"])
#################
# Exceptions #
#################
class KafkaError(RuntimeError):
pass
class BrokerResponseError(KafkaError):
pass
class UnknownError(BrokerResponseError):
errno = -1
message = 'UNKNOWN'
class OffsetOutOfRangeError(BrokerResponseError):
errno = 1
message = 'OFFSET_OUT_OF_RANGE'
class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITON'
class InvalidFetchRequestError(BrokerResponseError):
errno = 4
message = 'INVALID_FETCH_SIZE'
class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'
class NotLeaderForPartitionError(BrokerResponseError):
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'
class RequestTimedOutError(BrokerResponseError):
errno = 7
message = 'REQUEST_TIMED_OUT'
class BrokerNotAvailableError(BrokerResponseError):
errno = 8
message = 'BROKER_NOT_AVAILABLE'
class ReplicaNotAvailableError(BrokerResponseError):
errno = 9
message = 'REPLICA_NOT_AVAILABLE'
class MessageSizeTooLargeError(BrokerResponseError):
errno = 10
message = 'MESSAGE_SIZE_TOO_LARGE'
class StaleControllerEpochError(BrokerResponseError):
errno = 11
message = 'STALE_CONTROLLER_EPOCH'
class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
class StaleLeaderEpochCodeError(BrokerResponseError):
errno = 13
message = 'STALE_LEADER_EPOCH_CODE'
class OffsetsLoadInProgressCode(BrokerResponseError):
errno = 14
message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
errno = 15
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
class NotCoordinatorForConsumerCode(BrokerResponseError):
errno = 16
message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
class KafkaUnavailableError(KafkaError):
pass
class KafkaTimeoutError(KafkaError):
pass
class FailedPayloadsError(KafkaError):
def __init__(self, payload, *args):
super(FailedPayloadsError, self).__init__(*args)
self.payload = payload
class ConnectionError(KafkaError):
pass
class BufferUnderflowError(KafkaError):
pass
class ChecksumError(KafkaError):
pass
class ConsumerFetchSizeTooSmall(KafkaError):
pass
class ConsumerNoMoreData(KafkaError):
pass
class ConsumerTimeout(KafkaError):
pass
class ProtocolError(KafkaError):
pass
class UnsupportedCodecError(KafkaError):
pass
class KafkaConfigurationError(KafkaError):
pass
class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)
self.failed_msgs = failed_msgs
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj,
BrokerResponseError) and obj != BrokerResponseError:
yield obj
kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
def check_error(response):
if isinstance(response, Exception):
raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
)
RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
)
RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES