Treat metric_group_prefix as config in KafkaConsumer

This commit is contained in:
Dana Powers 2016-08-04 12:19:46 -07:00
parent a698162dc9
commit af08b54875
6 changed files with 17 additions and 19 deletions

View File

@ -42,11 +42,11 @@ class Fetcher(six.Iterator):
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, metrics, metric_group_prefix,
**configs):
def __init__(self, client, subscriptions, metrics, **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@ -94,7 +94,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.

View File

@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator):
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator):
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
# api_version was previously a str. accept old format for now
@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
self._client, self._subscription, self._metrics, metric_group_prefix,
self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False

View File

@ -55,9 +55,10 @@ class BaseCoordinator(object):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
'metric_group_prefix': '',
}
def __init__(self, client, metrics, metric_group_prefix, **configs):
def __init__(self, client, metrics, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
@ -92,7 +93,7 @@ class BaseCoordinator(object):
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
metric_group_prefix)
self.config['metric_group_prefix'])
def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:

View File

@ -37,10 +37,10 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 9),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
}
def __init__(self, client, subscription, metrics, metric_group_prefix,
**configs):
def __init__(self, client, subscription, metrics, **configs):
"""Initialize the coordination manager.
Keyword Arguments:
@ -76,9 +76,7 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client,
metrics, metric_group_prefix,
**configs)
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@ -111,7 +109,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task.reschedule()
self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, metric_group_prefix, self._subscription)
metrics, self.config['metric_group_prefix'], self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:

View File

@ -29,8 +29,7 @@ def client(conn):
@pytest.fixture
def coordinator(client):
return ConsumerCoordinator(client, SubscriptionState(), Metrics(),
'consumer')
return ConsumerCoordinator(client, SubscriptionState(), Metrics())
def test_init(client, coordinator):
@ -42,7 +41,7 @@ def test_init(client, coordinator):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(client, api_version):
coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer',
Metrics(),
enable_auto_commit=True,
group_id='foobar',
api_version=api_version)
@ -362,7 +361,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
client = KafkaClient(api_version=api_version)
coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer',
Metrics(),
api_version=api_version,
enable_auto_commit=enable,
group_id=group_id)

View File

@ -30,7 +30,7 @@ def fetcher(client, subscription_state):
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')
return Fetcher(client, subscription_state, Metrics())
def test_init_fetches(fetcher, mocker):