diff --git a/lower-constraints.txt b/lower-constraints.txt index 64b391d9..c72e02b4 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -22,7 +22,7 @@ keystoneauth1==3.4.0 linecache2==1.0.0 mccabe==0.4.0 mock==2.0.0 -monasca-common==2.7.0 +monasca-common==2.16.0 monotonic==0.6 mox3==0.20.0 msgpack-python==0.4.0 diff --git a/monasca_persister/conf/kafka_common.py b/monasca_persister/conf/kafka_common.py index 1217b075..2877adff 100644 --- a/monasca_persister/conf/kafka_common.py +++ b/monasca_persister/conf/kafka_common.py @@ -42,7 +42,13 @@ kafka_common_opts = [ default=32768), cfg.IntOpt('num_processors', help='Number of processes spawned by persister', - default=1) + default=1), + cfg.BoolOpt('legacy_kafka_client_enabled', + help='Enable legacy Kafka client. When set old version of ' + 'kafka-python library is used. Message format version ' + 'for the brokers should be set to 0.9.0.0 to avoid ' + 'performance issues until all consumers are upgraded.', + default=True) ] kafka_common_group = cfg.OptGroup(name='kafka', diff --git a/monasca_persister/kafka/__init__.py b/monasca_persister/kafka/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_persister/kafka/confluent_kafka_persister.py b/monasca_persister/kafka/confluent_kafka_persister.py new file mode 100644 index 00000000..a9c5202a --- /dev/null +++ b/monasca_persister/kafka/confluent_kafka_persister.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from monasca_common.kafka import client_factory +import six + +from monasca_persister.repositories import persister +from monasca_persister.repositories import singleton + + +@six.add_metaclass(singleton.Singleton) +class ConfluentKafkaPersister(persister.Persister): + + def __init__(self, kafka_conf, repository, client_id=""): + super(ConfluentKafkaPersister, self).__init__(kafka_conf, repository) + self._consumer = client_factory.get_kafka_consumer( + kafka_url=kafka_conf.uri, + kafka_consumer_group=kafka_conf.group_id, + kafka_topic=kafka_conf.topic, + client_id=client_id, + repartition_callback=ConfluentKafkaPersister.flush, + commit_callback=self._flush, + max_commit_interval=kafka_conf.max_wait_time_seconds + ) + + @staticmethod + def flush(kafka_consumer, partitions): + p = ConfluentKafkaPersister() + p._flush() diff --git a/monasca_persister/kafka/legacy_kafka_persister.py b/monasca_persister/kafka/legacy_kafka_persister.py new file mode 100644 index 00000000..8289127d --- /dev/null +++ b/monasca_persister/kafka/legacy_kafka_persister.py @@ -0,0 +1,34 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from monasca_common.kafka import client_factory +import six + +from monasca_persister.repositories import persister +from monasca_persister.repositories import singleton + + +@six.add_metaclass(singleton.Singleton) +class LegacyKafkaPersister(persister.Persister): + + def __init__(self, kafka_conf, zookeeper_conf, repository): + super(LegacyKafkaPersister, self).__init__(kafka_conf, repository) + self._consumer = client_factory.get_kafka_consumer( + kafka_url=kafka_conf.uri, + kafka_consumer_group=kafka_conf.group_id, + kafka_topic=kafka_conf.topic, + zookeeper_url=zookeeper_conf.uri, + zookeeper_path=kafka_conf.zookeeper_path, + use_legacy_client=True, + repartition_callback=self._flush, + commit_callback=self._flush, + max_commit_interval=kafka_conf.max_wait_time_seconds + ) diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 637abbaf..ebf92f3c 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -33,7 +33,8 @@ from oslo_config import cfg from oslo_log import log from monasca_persister import config -from monasca_persister.repositories import persister +from monasca_persister.kafka import confluent_kafka_persister +from monasca_persister.kafka import legacy_kafka_persister LOG = log.getLogger(__name__) @@ -91,8 +92,12 @@ def clean_exit(signum, frame=None): def start_process(respository, kafka_config): LOG.info("start process: {}".format(respository)) - m_persister = persister.Persister(kafka_config, cfg.CONF.zookeeper, - respository) + if kafka_config.legacy_kafka_client_enabled: + m_persister = legacy_kafka_persister.LegacyKafkaPersister( + kafka_config, cfg.CONF.zookeeper, respository) + else: + m_persister = confluent_kafka_persister.ConfluentKafkaPersister( + kafka_config, respository) m_persister.run() diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index c66cab84..0be7fff0 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -13,36 +13,25 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABCMeta import os from oslo_config import cfg from oslo_log import log +import six -from monasca_common.kafka import consumer +from monasca_persister.repositories import singleton LOG = log.getLogger(__name__) -class Persister(object): - - def __init__(self, kafka_conf, zookeeper_conf, repository): +@six.add_metaclass(singleton.Singleton) +class Persister(six.with_metaclass(ABCMeta, object)): + def __init__(self, kafka_conf, repository): self._data_points = [] - self._kafka_topic = kafka_conf.topic - self._batch_size = kafka_conf.batch_size - - self._consumer = consumer.KafkaConsumer( - kafka_conf.uri, - zookeeper_conf.uri, - kafka_conf.zookeeper_path, - kafka_conf.group_id, - kafka_conf.topic, - repartition_callback=self._flush, - commit_callback=self._flush, - commit_timeout=kafka_conf.max_wait_time_seconds) - self.repository = repository() def _flush(self): @@ -76,9 +65,8 @@ class Persister(object): def run(self): try: - for raw_message in self._consumer: + for message in self._consumer: try: - message = raw_message[1] data_point = self.repository.process_message(message) self._data_points.append(data_point) except Exception: diff --git a/monasca_persister/repositories/singleton.py b/monasca_persister/repositories/singleton.py new file mode 100644 index 00000000..c69e0d04 --- /dev/null +++ b/monasca_persister/repositories/singleton.py @@ -0,0 +1,22 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Singleton(type): + def __init__(cls, name, bases, d): + super(Singleton, cls).__init__(name, bases, d) + cls.instance = None + + def __call__(cls, *args, **kw): + if cls.instance is None: + cls.instance = super(Singleton, cls).__call__(*args, **kw) + return cls.instance diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index fbf14133..154ad94f 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -17,7 +17,8 @@ import ujson as json def parse_measurement_message(message): - decoded_message = json.loads(message.message.value) + + decoded_message = json.loads(message.value()) metric = decoded_message['metric'] @@ -39,7 +40,8 @@ def parse_measurement_message(message): def parse_alarm_state_hist_message(message): - decoded_message = json.loads(message.message.value) + + decoded_message = json.loads(message.value()) alarm_transitioned = decoded_message['alarm-transitioned'] @@ -94,7 +96,8 @@ def parse_alarm_state_hist_message(message): def parse_events_message(message): - decoded_message = json.loads(message.message.value) + + decoded_message = json.loads(message.value()) event_type = decoded_message['event']['event_type'] timestamp = decoded_message['event']['timestamp'] payload = decoded_message['event']['payload'] diff --git a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py index 3136ddfc..c96c1e08 100644 --- a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py @@ -57,7 +57,7 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): def test_process_message(self): message = Mock() - message.message.value = """{ + message.value.return_value = """{ "alarm-transitioned": { "alarmId": "dummyid", "metrics": "dummymetrics", diff --git a/monasca_persister/tests/test_events.py b/monasca_persister/tests/test_events.py index 95115dca..fd9f5ee0 100644 --- a/monasca_persister/tests/test_events.py +++ b/monasca_persister/tests/test_events.py @@ -11,15 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from datetime import datetime import json import os + +from mock import patch from oslotest import base +from testtools import matchers + from monasca_persister.repositories.elasticsearch import events_repository from monasca_persister.repositories import utils -from mock import Mock -from testtools import matchers -from datetime import datetime class TestEvents(base.BaseTestCase): @@ -58,10 +59,13 @@ class TestEvents(base.BaseTestCase): self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43')) - def _load_event(self, event_name): + @patch('monasca_common.kafka.legacy_kafka_message') + def _load_event(self, event_name, mock_kafka_message): if self.events is None: filepath = os.path.join(os.path.dirname(__file__), 'events.json') self.events = json.load(open(filepath)) # create a kafka message envelope value = json.dumps(self.events[event_name]) - return Mock(message=Mock(value=value)) + message = mock_kafka_message.LegacyKafkaMessage() + message.value.return_value = value + return message diff --git a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py index da169aad..6f55e481 100644 --- a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py @@ -35,7 +35,7 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase): def test_process_message(self): message = Mock() - message.message.value = """{ + message.value.return_value = """{ "alarm-transitioned": { "alarmId": "dummyid", "metrics": "dummymetrics", diff --git a/monasca_persister/tests/test_influxdb_metrics_repository.py b/monasca_persister/tests/test_influxdb_metrics_repository.py index d8280d1f..0aab456d 100644 --- a/monasca_persister/tests/test_influxdb_metrics_repository.py +++ b/monasca_persister/tests/test_influxdb_metrics_repository.py @@ -60,4 +60,6 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): "creation_time":1554725988 } ''' - return Mock(message=Mock(value=metric)) + message = Mock() + message.value.return_value = metric + return message diff --git a/monasca_persister/tests/test_persister_main.py b/monasca_persister/tests/test_persister_main.py index 964df307..dc4f590c 100644 --- a/monasca_persister/tests/test_persister_main.py +++ b/monasca_persister/tests/test_persister_main.py @@ -188,7 +188,8 @@ class TestPersister(base.BaseTestCase): fake_kafka_config = Mock() fake_repository = Mock() - with patch('monasca_persister.repositories.persister.Persister') as mock_persister_class: + with patch('monasca_persister.kafka.legacy_kafka_persister' + '.LegacyKafkaPersister') as mock_persister_class: self.persister.start_process(fake_repository, fake_kafka_config) mock_persister_class.assert_called_once_with( diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py index fc621e16..0bb94b4f 100644 --- a/monasca_persister/tests/test_persister_repo.py +++ b/monasca_persister/tests/test_persister_repo.py @@ -21,7 +21,7 @@ from oslotest import base from oslo_config import cfg from monasca_common.kafka import consumer -from monasca_persister.repositories.persister import Persister +from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister from monasca_persister.repositories.persister import LOG @@ -36,7 +36,7 @@ class TestPersisterRepo(base.BaseTestCase): self._set_patchers() self._set_mocks() - self.persister = Persister(self.mock_kafka, self.mock_zookeeper, Mock()) + self.persister = LegacyKafkaPersister(self.mock_kafka, self.mock_zookeeper, Mock()) def _set_mocks(self): self.mock_kafka = Mock() @@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase): return_value='message'): with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: self.persister._data_points = ['a'] - self.persister._consumer.__iter__.return_value = ['aa', 'bb'] + self.persister._consumer.__iter__.return_value = ('aa', 'bb') self.persister._batch_size = 1 self.persister.run() mock_consumer.commit.assert_called() diff --git a/monasca_persister/tests/test_utils.py b/monasca_persister/tests/test_utils.py index faca45f0..f2991ee1 100644 --- a/monasca_persister/tests/test_utils.py +++ b/monasca_persister/tests/test_utils.py @@ -27,7 +27,7 @@ class TestUtils(base.BaseTestCase): def test_parse_measurement_message(self): message = Mock() - message.message.value = """{ + message.value.return_value = """{ "metric": { "name": "metric_name", "timestamp": "metric_timestamp", @@ -54,7 +54,7 @@ class TestUtils(base.BaseTestCase): def test_parse_alarm_state_hist_message(self): message = Mock() - message.message.value = """{ + message.value.return_value = """{ "alarm-transitioned": { "alarmId": "dummyid", "metrics": "dummymetrics", @@ -92,7 +92,7 @@ class TestUtils(base.BaseTestCase): def test_parse_events_message(self): message = Mock() - message.message.value = """{ + message.value.return_value = """{ "event": { "event_type": "dummy_event_type", "timestamp": "dummy_timestamp", diff --git a/releasenotes/notes/add_legacy_kafka_client_enabled_option-b6830637029dbca7.yaml b/releasenotes/notes/add_legacy_kafka_client_enabled_option-b6830637029dbca7.yaml new file mode 100644 index 00000000..963a58d1 --- /dev/null +++ b/releasenotes/notes/add_legacy_kafka_client_enabled_option-b6830637029dbca7.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Configuration option `legacy_kafka_client_enabled` added to allow working + with both legacy kafka-python and new Confluent Kafka client. Please set + message format version for the Kafka brokers to 0.9.0.0 to avoid + performance issues until all consumers are upgraded. diff --git a/requirements.txt b/requirements.txt index 44ca00a1..9754709b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ oslo.config>=5.2.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 six>=1.10.0 # MIT -monasca-common>=2.7.0 # Apache-2.0 +monasca-common>=2.16.0 # Apache-2.0