diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py new file mode 100644 index 00000000..fc621e16 --- /dev/null +++ b/monasca_persister/tests/test_persister_repo.py @@ -0,0 +1,130 @@ +# (C) Copyright 2019 Fujitsu Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from mock import patch +from mock import Mock + +from oslotest import base +from oslo_config import cfg + +from monasca_common.kafka import consumer +from monasca_persister.repositories.persister import Persister +from monasca_persister.repositories.persister import LOG + + +class FakeException(Exception): + pass + + +class TestPersisterRepo(base.BaseTestCase): + def setUp(self): + super(TestPersisterRepo, self).setUp() + + self._set_patchers() + self._set_mocks() + + self.persister = Persister(self.mock_kafka, self.mock_zookeeper, Mock()) + + def _set_mocks(self): + self.mock_kafka = Mock() + self.mock_kafka.topic = 'topic' + self.mock_kafka.batch_size = 1 + self.mock_kafka.zookeeper_path = '' + self.mock_kafka.group_id = 0 + self.mock_kafka.max_wait_time_seconds = 0 + self.mock_zookeeper = Mock(uri='') + + self.mock_consumer_init = self.patch_kafka_init.start() + self.mock_client_init = self.patch_kafka_client_init.start() + self.mock_consumer_commit = self.patch_kafka_commit.start() + self.mock_log_warning = self.patch_log_warning.start() + self.mock_log_exception = self.patch_log_exception.start() + + def _set_patchers(self): + self.patch_kafka_init = patch.object(consumer.KafkaConsumer, '__init__', + return_value=None) + self.patch_kafka_commit = patch.object(consumer.KafkaConsumer, 'commit', + return_value=FakeException()) + self.patch_kafka_client_init = patch.object(consumer.kafka_client.KafkaClient, '__init__', + return_value=None) + self.patch_log_warning = patch.object(LOG, 'warning') + self.patch_log_exception = patch.object(LOG, 'exception') + + def tearDown(self): + super(TestPersisterRepo, self).tearDown() + + self.mock_consumer_init.reset_mock() + self.mock_client_init.reset_mock() + self.mock_consumer_commit.reset_mock() + self.mock_log_warning.reset_mock() + self.mock_log_exception.reset_mock() + + self.patch_kafka_init.stop() + self.patch_kafka_commit.stop() + self.patch_kafka_client_init.stop() + self.patch_log_warning.stop() + self.patch_log_exception.stop() + + def test_flush_if_data_points_is_none(self): + self.persister._data_points = None + self.assertIsNone(self.persister._flush()) + + def test_run_if_consumer_is_faulty(self): + with patch.object(os, '_exit', return_value=None) as mock_exit: + self.persister._data_points = [] + self.persister._consumer = Mock(side_effect=FakeException) + self.persister.run() + mock_exit.assert_called_once_with(1) + + def test_run_logs_exception_from_consumer(self): + with patch.object(self.persister.repository, 'process_message', + side_effect=FakeException): + self.persister._data_points = () + self.persister._consumer = ['aa'] + self.persister.run() + self.mock_log_exception.assert_called() + + def test_run_commit_is_called_and_data_points_is_emptied(self): + with patch.object(self.persister.repository, 'process_message', + return_value='message'): + with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: + self.persister._data_points = ['a'] + self.persister._consumer.__iter__.return_value = ['aa', 'bb'] + self.persister._batch_size = 1 + self.persister.run() + mock_consumer.commit.assert_called() + self.assertEqual([], self.persister._data_points) + + @patch.object(LOG, 'info', side_effect=FakeException()) + def test_flush_logs_warning_and_exception(self, mock_log_info): + exception_msgs = ['partial write: points beyond retention policy dropped', + 'unable to parse points'] + with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error', + return_value=True)): + for elem in exception_msgs: + mock_log_info.side_effect.message = elem + self.persister._data_points = ['some'] + self.persister._flush() + self.mock_log_warning.assert_called() + + @patch.object(LOG, 'info', side_effect=FakeException()) + def test_flush_logs_exception(self, mock_log_info): + with(patch.object(cfg.CONF.repositories, + 'ignore_parse_point_error', return_value=False)): + mock_log_info.side_effect.message = 'some msg' + self.persister._data_points = ['some'] + self.assertRaises(FakeException, self.persister._flush) + self.mock_log_exception.assert_called()