diff --git a/almanach/collector/filters/__init__.py b/almanach/collector/filters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/almanach/collector/filters/base_filter.py b/almanach/collector/filters/base_filter.py new file mode 100644 index 0000000..d34cc1a --- /dev/null +++ b/almanach/collector/filters/base_filter.py @@ -0,0 +1,24 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseFilter(object): + + @abc.abstractmethod + def ignore_notification(self, notification): + pass diff --git a/almanach/collector/filters/delete_instance_before_completion_filter.py b/almanach/collector/filters/delete_instance_before_completion_filter.py new file mode 100644 index 0000000..7df1261 --- /dev/null +++ b/almanach/collector/filters/delete_instance_before_completion_filter.py @@ -0,0 +1,51 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from almanach.collector.filters import base_filter +from almanach.core.helpers import date_helper + +LOG = logging.getLogger(__name__) + + +class DeleteInstanceBeforeCompletionFilter(base_filter.BaseFilter): + + def __init__(self, config): + self.config = config + + def ignore_notification(self, notification): + delta = self.config.entities.instance_existence_threshold + + if self._has_been_retried(notification) \ + and self._is_instance_deleted(notification) \ + and self._was_never_created_successfully(notification, delta): + LOG.info('Instance %s was never created successfully during %d seconds', + notification.payload.get('instance_id'), + delta) + return True + return False + + @staticmethod + def _is_instance_deleted(notification): + return notification.payload.get('state') == 'deleted' + + def _has_been_retried(self, notification): + return notification.get_retry_counter() >= self.config.collector.min_retries + + @staticmethod + def _was_never_created_successfully(notification, delta): + creation_date = date_helper.DateHelper().parse(notification.payload.get('created_at')) + deletion_date = date_helper.DateHelper().parse(notification.payload.get('deleted_at')) + return date_helper.DateHelper().is_within_range(creation_date, deletion_date, delta) diff --git a/almanach/collector/filters/errored_instance_filter.py b/almanach/collector/filters/errored_instance_filter.py new file mode 100644 index 0000000..66a0ec7 --- /dev/null +++ b/almanach/collector/filters/errored_instance_filter.py @@ -0,0 +1,29 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from almanach.collector.filters import base_filter + +LOG = logging.getLogger(__name__) + + +class ErroredInstanceFilter(base_filter.BaseFilter): + + def ignore_notification(self, notification): + if notification.payload.get('state') == 'error': + LOG.info('Instance deletion event ignored because instance %s was badly created', + notification.payload.get('instance_id')) + return True + return False diff --git a/almanach/collector/handlers/instance_handler.py b/almanach/collector/handlers/instance_handler.py index fd27cdc..6d8c16e 100644 --- a/almanach/collector/handlers/instance_handler.py +++ b/almanach/collector/handlers/instance_handler.py @@ -22,9 +22,10 @@ LOG = logging.getLogger(__name__) class InstanceHandler(base_handler.BaseHandler): - def __init__(self, controller): + def __init__(self, controller, on_delete_filter): super(InstanceHandler, self).__init__() self.controller = controller + self.on_delete_filter = on_delete_filter def handle_events(self, notification): if notification.event_type == "compute.instance.create.end": @@ -54,10 +55,7 @@ class InstanceHandler(base_handler.BaseHandler): try: self.controller.delete_instance(instance_id, date) except exception.EntityNotFoundException as e: - if notification.payload.get('state') == 'error': - LOG.info('Instance deletion event ignored because instance %s was badly created', - instance_id) - else: + if not self.on_delete_filter.ignore_notification(notification): raise e def _on_instance_resized(self, notification): diff --git a/almanach/collector/notification.py b/almanach/collector/notification.py index f61526e..2c3c797 100644 --- a/almanach/collector/notification.py +++ b/almanach/collector/notification.py @@ -19,6 +19,18 @@ from oslo_log import log as logging LOG = logging.getLogger(__name__) +class NotificationFilter(object): + + def __init__(self): + self.filters = [] + + def add(self, notification_filter): + self.filters.append(notification_filter) + + def ignore_notification(self, notification): + return any([f.ignore_notification(notification) for f in self.filters]) + + class NotificationMessage(object): RETRY_COUNTER = 'retry_count' diff --git a/almanach/collector/service.py b/almanach/collector/service.py index 3f40e21..4f58a18 100644 --- a/almanach/collector/service.py +++ b/almanach/collector/service.py @@ -15,6 +15,8 @@ from oslo_log import log as logging from oslo_service import service +from almanach.collector.filters import delete_instance_before_completion_filter +from almanach.collector.filters import errored_instance_filter from almanach.collector.handlers import instance_handler from almanach.collector.handlers import volume_handler from almanach.collector.handlers import volume_type_handler @@ -57,18 +59,26 @@ class ServiceFactory(object): messaging_factory = messaging.MessagingFactory(self.config) notification_handler = notification.NotificationHandler(self.config, messaging_factory) - notification_handler.add_event_handler(self._get_instance_handler()) - notification_handler.add_event_handler(self._get_volume_handler()) - notification_handler.add_event_handler(self._get_volume_type_handler()) + notification_handler.add_event_handler(self.get_instance_handler()) + notification_handler.add_event_handler(self.get_volume_handler()) + notification_handler.add_event_handler(self.get_volume_type_handler()) listeners = messaging_factory.get_listeners(notification_handler) return CollectorService(listeners) - def _get_instance_handler(self): - return instance_handler.InstanceHandler(self.core_factory.get_instance_controller()) + def get_instance_handler(self): + return instance_handler.InstanceHandler( + self.core_factory.get_instance_controller(), + self.get_on_delete_filters()) - def _get_volume_handler(self): + def get_volume_handler(self): return volume_handler.VolumeHandler(self.core_factory.get_volume_controller()) - def _get_volume_type_handler(self): + def get_volume_type_handler(self): return volume_type_handler.VolumeTypeHandler(self.core_factory.get_volume_type_controller()) + + def get_on_delete_filters(self): + filters = notification.NotificationFilter() + filters.add(errored_instance_filter.ErroredInstanceFilter()) + filters.add(delete_instance_before_completion_filter.DeleteInstanceBeforeCompletionFilter(self.config)) + return filters diff --git a/almanach/core/helpers/__init__.py b/almanach/core/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/almanach/core/helpers/date_helper.py b/almanach/core/helpers/date_helper.py new file mode 100644 index 0000000..04ca5ef --- /dev/null +++ b/almanach/core/helpers/date_helper.py @@ -0,0 +1,42 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from dateutil import parser +import pytz + +from almanach.core import exception + + +class DateHelper(object): + + @staticmethod + def parse(value): + try: + dt = value if isinstance(value, datetime) else parser.parse(value) + return DateHelper._normalize_timezone(dt) + except (TypeError, ValueError): + raise exception.DateFormatException() + + @staticmethod + def is_within_range(d1, d2, delta): + diff = (d2 - d1).total_seconds() + return abs(diff) < delta + + @staticmethod + def _normalize_timezone(dt): + try: + return dt.astimezone(pytz.utc) + except ValueError: + return dt.replace(tzinfo=pytz.utc) diff --git a/almanach/core/opts.py b/almanach/core/opts.py index 1f7ae93..ea0f8c7 100644 --- a/almanach/core/opts.py +++ b/almanach/core/opts.py @@ -46,10 +46,13 @@ collector_opts = [ default='almanach', help='AMQP topic used for OpenStack notifications'), cfg.IntOpt('max_retries', + default=5, + help='Number of retries before to send message to critical queue'), + cfg.IntOpt('min_retries', default=3, - help='Maximal number of message retries'), + help='Number of retries before to use filters to discard notifications'), cfg.IntOpt('retry_delay', - default=10, + default=25, help='Delay in seconds between retries'), ] @@ -87,6 +90,9 @@ auth_opts = [ ] entity_opts = [ + cfg.IntOpt('instance_existence_threshold', + default=900, + help='Instance existence threshold'), cfg.IntOpt('volume_existence_threshold', default=60, help='Volume existence threshold'), diff --git a/almanach/tests/unit/collector/filters/__init__.py b/almanach/tests/unit/collector/filters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/almanach/tests/unit/collector/filters/test_delete_instance_before_completion_filter.py b/almanach/tests/unit/collector/filters/test_delete_instance_before_completion_filter.py new file mode 100644 index 0000000..f77da90 --- /dev/null +++ b/almanach/tests/unit/collector/filters/test_delete_instance_before_completion_filter.py @@ -0,0 +1,58 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from almanach.collector.filters import delete_instance_before_completion_filter +from almanach.collector import notification + +from almanach.tests.unit import base +from almanach.tests.unit.builders import notification as builder + + +class TestDeleteInstanceBeforeCompletionFilter(base.BaseTestCase): + + def setUp(self): + super(TestDeleteInstanceBeforeCompletionFilter, self).setUp() + self.filter = delete_instance_before_completion_filter.DeleteInstanceBeforeCompletionFilter(self.config) + + def test_with_instance_deleted_quickly(self): + message = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('state', 'deleted') \ + .with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \ + .with_payload_value('deleted_at', '2017-02-05T11:34:36.000000') \ + .with_context_value(notification.NotificationMessage.RETRY_COUNTER, 3) \ + .build() + + self.assertTrue(self.filter.ignore_notification(message)) + + def test_when_message_was_never_retried(self): + message = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('state', 'deleted') \ + .with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \ + .with_payload_value('deleted_at', '2017-02-05T11:34:36.000000') \ + .build() + + self.assertFalse(self.filter.ignore_notification(message)) + + def test_with_instance_deleted_later(self): + message = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('state', 'deleted') \ + .with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \ + .with_payload_value('deleted_at', '2017-02-05T11:44:36.000000') \ + .with_context_value(notification.NotificationMessage.RETRY_COUNTER, 3) \ + .build() + + self.assertFalse(self.filter.ignore_notification(message)) diff --git a/almanach/tests/unit/collector/filters/test_errored_instance_filter.py b/almanach/tests/unit/collector/filters/test_errored_instance_filter.py new file mode 100644 index 0000000..95682f8 --- /dev/null +++ b/almanach/tests/unit/collector/filters/test_errored_instance_filter.py @@ -0,0 +1,41 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from almanach.collector.filters import errored_instance_filter + +from almanach.tests.unit import base +from almanach.tests.unit.builders import notification as builder + + +class TestErroredInstanceFilter(base.BaseTestCase): + + def setUp(self): + super(TestErroredInstanceFilter, self).setUp() + self.filter = errored_instance_filter.ErroredInstanceFilter() + + def test_instance_in_error_deleted(self): + notification = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('state', 'error') \ + .build() + + self.assertTrue(self.filter.ignore_notification(notification)) + + def test_active_instance_deleted(self): + notification = builder.InstanceNotificationBuilder() \ + .with_event_type('compute.instance.delete.end') \ + .with_payload_value('state', 'active') \ + .build() + + self.assertFalse(self.filter.ignore_notification(notification)) diff --git a/almanach/tests/unit/collector/handlers/test_instance_handler.py b/almanach/tests/unit/collector/handlers/test_instance_handler.py index a3e2e22..dacd172 100644 --- a/almanach/tests/unit/collector/handlers/test_instance_handler.py +++ b/almanach/tests/unit/collector/handlers/test_instance_handler.py @@ -25,7 +25,9 @@ class TestInstanceHandler(base.BaseTestCase): def setUp(self): super(TestInstanceHandler, self).setUp() self.controller = mock.Mock() - self.instance_handler = instance_handler.InstanceHandler(self.controller) + self.on_delete_filter = mock.Mock() + self.on_delete_filter.ignore_notification.return_value = False + self.instance_handler = instance_handler.InstanceHandler(self.controller, self.on_delete_filter) def test_instance_created(self): notification = builder.InstanceNotificationBuilder()\ @@ -77,6 +79,8 @@ class TestInstanceHandler(base.BaseTestCase): ) def test_instance_in_error_deleted(self): + self.on_delete_filter.ignore_notification.return_value = True + notification = builder.InstanceNotificationBuilder() \ .with_event_type('compute.instance.delete.end') \ .with_payload_value('terminated_at', 'a_date') \ diff --git a/almanach/tests/unit/collector/test_notification_filter.py b/almanach/tests/unit/collector/test_notification_filter.py new file mode 100644 index 0000000..ca554c5 --- /dev/null +++ b/almanach/tests/unit/collector/test_notification_filter.py @@ -0,0 +1,52 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from almanach.collector import notification + +from almanach.tests.unit import base + + +class TestNotificationFilter(base.BaseTestCase): + + def setUp(self): + super(TestNotificationFilter, self).setUp() + self.filter = notification.NotificationFilter() + + def test_when_one_filter_returns_true(self): + notification = mock.Mock() + filter1 = mock.Mock() + filter2 = mock.Mock() + + self.filter.add(filter1) + self.filter.add(filter2) + + filter1.ignore_notification.return_value = False + filter2.ignore_notification.return_value = True + + self.assertTrue(self.filter.ignore_notification(notification)) + + def test_when_all_filters_returns_false(self): + notification = mock.Mock() + filter1 = mock.Mock() + filter2 = mock.Mock() + + self.filter.add(filter1) + self.filter.add(filter2) + + filter1.ignore_notification.return_value = False + filter2.ignore_notification.return_value = False + + self.assertFalse(self.filter.ignore_notification(notification)) diff --git a/almanach/tests/unit/collector/test_notification.py b/almanach/tests/unit/collector/test_notification_handler.py similarity index 97% rename from almanach/tests/unit/collector/test_notification.py rename to almanach/tests/unit/collector/test_notification_handler.py index 00699df..e8fe659 100644 --- a/almanach/tests/unit/collector/test_notification.py +++ b/almanach/tests/unit/collector/test_notification_handler.py @@ -18,10 +18,10 @@ from almanach.collector import notification from almanach.tests.unit import base -class TestNotification(base.BaseTestCase): +class TestNotificationHandler(base.BaseTestCase): def setUp(self): - super(TestNotification, self).setUp() + super(TestNotificationHandler, self).setUp() self.config_fixture.config(retry_delay=0, group='collector') self.config_fixture.config(max_retries=3, group='collector') diff --git a/almanach/tests/unit/collector/test_service.py b/almanach/tests/unit/collector/test_service.py index 9a9ea28..8e37876 100644 --- a/almanach/tests/unit/collector/test_service.py +++ b/almanach/tests/unit/collector/test_service.py @@ -14,7 +14,12 @@ import mock +from almanach.collector.handlers import instance_handler +from almanach.collector.handlers import volume_handler +from almanach.collector.handlers import volume_type_handler +from almanach.collector import notification from almanach.collector import service + from almanach.tests.unit import base @@ -32,3 +37,19 @@ class TestServiceFactory(base.BaseTestCase): self.core_factory.get_instance_controller.assert_called_once() self.core_factory.get_volume_controller.assert_called_once() self.core_factory.get_volume_type_controller.assert_called_once() + + def test_get_instance_handler(self): + self.assertIsInstance(self.factory.get_instance_handler(), + instance_handler.InstanceHandler) + + def test_get_volume_handler(self): + self.assertIsInstance(self.factory.get_volume_handler(), + volume_handler.VolumeHandler) + + def test_get_volume_type_handler(self): + self.assertIsInstance(self.factory.get_volume_type_handler(), + volume_type_handler.VolumeTypeHandler) + + def test_get_on_delete_filters(self): + self.assertIsInstance(self.factory.get_on_delete_filters(), + notification.NotificationFilter) diff --git a/almanach/tests/unit/core/helpers/__init__.py b/almanach/tests/unit/core/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/almanach/tests/unit/core/helpers/test_date_helper.py b/almanach/tests/unit/core/helpers/test_date_helper.py new file mode 100644 index 0000000..5d48588 --- /dev/null +++ b/almanach/tests/unit/core/helpers/test_date_helper.py @@ -0,0 +1,65 @@ +# Copyright 2017 Internap. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import pytz + +from almanach.core.helpers import date_helper + +from almanach.tests.unit import base + + +class TestDateHelper(base.BaseTestCase): + + def setUp(self): + super(TestDateHelper, self).setUp() + self.helper = date_helper.DateHelper() + + def test_parser_with_datetime_object(self): + original_datetime = datetime.now(tz=pytz.utc) + d = self.helper.parse(original_datetime) + self.assertEqual(original_datetime, d) + + def test_parser_created_at_string(self): + d = self.helper.parse('2017-02-15 05:22:57+00:00') + self.assertIsInstance(d, datetime) + self.assertEqual(d.day, 15) + self.assertEqual(d.month, 2) + self.assertEqual(d.year, 2017) + self.assertEqual(d.hour, 5) + self.assertEqual(d.minute, 22) + self.assertEqual(d.second, 57) + self.assertEqual(d.tzinfo, pytz.utc) + + def test_parser_deleted_at_string(self): + d = self.helper.parse('2017-02-05T11:34:36.000000') + self.assertIsInstance(d, datetime) + self.assertEqual(d.day, 5) + self.assertEqual(d.month, 2) + self.assertEqual(d.year, 2017) + self.assertEqual(d.hour, 11) + self.assertEqual(d.minute, 34) + self.assertEqual(d.second, 36) + self.assertEqual(d.tzinfo, pytz.utc) + + def test_is_whithin_range(self): + d1 = self.helper.parse('2017-02-15 05:22:57+00:00') + d2 = self.helper.parse('2017-02-15T05:30:04.000000') + self.assertFalse(self.helper.is_within_range(d1, d2, 30)) + self.assertTrue(self.helper.is_within_range(d1, d2, 600)) + + d2 = self.helper.parse('2017-02-15 05:22:57+00:00') + d1 = self.helper.parse('2017-02-15T05:30:04.000000') + self.assertFalse(self.helper.is_within_range(d1, d2, 30)) + self.assertTrue(self.helper.is_within_range(d1, d2, 600))