Handle instances deleted before completion

This change avoid sending notification to critical queue
when instances are deleted before their completion.

Since we are never going to receive the create.end event,
we use a 15 minutes window to assume the instance was never
created. This check is made only if the entity is not found.

- Add more flexible filters system
- For now, filters are only for on delete events
- Add date helper to manipulate and parse date

Change-Id: Iba97f050bf6e164bf0d83035a884cdbe023d92fc
This commit is contained in:
Frédéric Guillot 2017-02-16 09:39:06 -05:00
parent 45e76f87d4
commit 262586195d
19 changed files with 430 additions and 17 deletions

View File

View File

@ -0,0 +1,24 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BaseFilter(object):
@abc.abstractmethod
def ignore_notification(self, notification):
pass

View File

@ -0,0 +1,51 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from almanach.collector.filters import base_filter
from almanach.core.helpers import date_helper
LOG = logging.getLogger(__name__)
class DeleteInstanceBeforeCompletionFilter(base_filter.BaseFilter):
def __init__(self, config):
self.config = config
def ignore_notification(self, notification):
delta = self.config.entities.instance_existence_threshold
if self._has_been_retried(notification) \
and self._is_instance_deleted(notification) \
and self._was_never_created_successfully(notification, delta):
LOG.info('Instance %s was never created successfully during %d seconds',
notification.payload.get('instance_id'),
delta)
return True
return False
@staticmethod
def _is_instance_deleted(notification):
return notification.payload.get('state') == 'deleted'
def _has_been_retried(self, notification):
return notification.get_retry_counter() >= self.config.collector.min_retries
@staticmethod
def _was_never_created_successfully(notification, delta):
creation_date = date_helper.DateHelper().parse(notification.payload.get('created_at'))
deletion_date = date_helper.DateHelper().parse(notification.payload.get('deleted_at'))
return date_helper.DateHelper().is_within_range(creation_date, deletion_date, delta)

View File

@ -0,0 +1,29 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from almanach.collector.filters import base_filter
LOG = logging.getLogger(__name__)
class ErroredInstanceFilter(base_filter.BaseFilter):
def ignore_notification(self, notification):
if notification.payload.get('state') == 'error':
LOG.info('Instance deletion event ignored because instance %s was badly created',
notification.payload.get('instance_id'))
return True
return False

View File

@ -22,9 +22,10 @@ LOG = logging.getLogger(__name__)
class InstanceHandler(base_handler.BaseHandler):
def __init__(self, controller):
def __init__(self, controller, on_delete_filter):
super(InstanceHandler, self).__init__()
self.controller = controller
self.on_delete_filter = on_delete_filter
def handle_events(self, notification):
if notification.event_type == "compute.instance.create.end":
@ -54,10 +55,7 @@ class InstanceHandler(base_handler.BaseHandler):
try:
self.controller.delete_instance(instance_id, date)
except exception.EntityNotFoundException as e:
if notification.payload.get('state') == 'error':
LOG.info('Instance deletion event ignored because instance %s was badly created',
instance_id)
else:
if not self.on_delete_filter.ignore_notification(notification):
raise e
def _on_instance_resized(self, notification):

View File

@ -19,6 +19,18 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class NotificationFilter(object):
def __init__(self):
self.filters = []
def add(self, notification_filter):
self.filters.append(notification_filter)
def ignore_notification(self, notification):
return any([f.ignore_notification(notification) for f in self.filters])
class NotificationMessage(object):
RETRY_COUNTER = 'retry_count'

View File

@ -15,6 +15,8 @@
from oslo_log import log as logging
from oslo_service import service
from almanach.collector.filters import delete_instance_before_completion_filter
from almanach.collector.filters import errored_instance_filter
from almanach.collector.handlers import instance_handler
from almanach.collector.handlers import volume_handler
from almanach.collector.handlers import volume_type_handler
@ -57,18 +59,26 @@ class ServiceFactory(object):
messaging_factory = messaging.MessagingFactory(self.config)
notification_handler = notification.NotificationHandler(self.config, messaging_factory)
notification_handler.add_event_handler(self._get_instance_handler())
notification_handler.add_event_handler(self._get_volume_handler())
notification_handler.add_event_handler(self._get_volume_type_handler())
notification_handler.add_event_handler(self.get_instance_handler())
notification_handler.add_event_handler(self.get_volume_handler())
notification_handler.add_event_handler(self.get_volume_type_handler())
listeners = messaging_factory.get_listeners(notification_handler)
return CollectorService(listeners)
def _get_instance_handler(self):
return instance_handler.InstanceHandler(self.core_factory.get_instance_controller())
def get_instance_handler(self):
return instance_handler.InstanceHandler(
self.core_factory.get_instance_controller(),
self.get_on_delete_filters())
def _get_volume_handler(self):
def get_volume_handler(self):
return volume_handler.VolumeHandler(self.core_factory.get_volume_controller())
def _get_volume_type_handler(self):
def get_volume_type_handler(self):
return volume_type_handler.VolumeTypeHandler(self.core_factory.get_volume_type_controller())
def get_on_delete_filters(self):
filters = notification.NotificationFilter()
filters.add(errored_instance_filter.ErroredInstanceFilter())
filters.add(delete_instance_before_completion_filter.DeleteInstanceBeforeCompletionFilter(self.config))
return filters

View File

View File

@ -0,0 +1,42 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from dateutil import parser
import pytz
from almanach.core import exception
class DateHelper(object):
@staticmethod
def parse(value):
try:
dt = value if isinstance(value, datetime) else parser.parse(value)
return DateHelper._normalize_timezone(dt)
except (TypeError, ValueError):
raise exception.DateFormatException()
@staticmethod
def is_within_range(d1, d2, delta):
diff = (d2 - d1).total_seconds()
return abs(diff) < delta
@staticmethod
def _normalize_timezone(dt):
try:
return dt.astimezone(pytz.utc)
except ValueError:
return dt.replace(tzinfo=pytz.utc)

View File

@ -46,10 +46,13 @@ collector_opts = [
default='almanach',
help='AMQP topic used for OpenStack notifications'),
cfg.IntOpt('max_retries',
default=5,
help='Number of retries before to send message to critical queue'),
cfg.IntOpt('min_retries',
default=3,
help='Maximal number of message retries'),
help='Number of retries before to use filters to discard notifications'),
cfg.IntOpt('retry_delay',
default=10,
default=25,
help='Delay in seconds between retries'),
]
@ -87,6 +90,9 @@ auth_opts = [
]
entity_opts = [
cfg.IntOpt('instance_existence_threshold',
default=900,
help='Instance existence threshold'),
cfg.IntOpt('volume_existence_threshold',
default=60,
help='Volume existence threshold'),

View File

@ -0,0 +1,58 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from almanach.collector.filters import delete_instance_before_completion_filter
from almanach.collector import notification
from almanach.tests.unit import base
from almanach.tests.unit.builders import notification as builder
class TestDeleteInstanceBeforeCompletionFilter(base.BaseTestCase):
def setUp(self):
super(TestDeleteInstanceBeforeCompletionFilter, self).setUp()
self.filter = delete_instance_before_completion_filter.DeleteInstanceBeforeCompletionFilter(self.config)
def test_with_instance_deleted_quickly(self):
message = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('state', 'deleted') \
.with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \
.with_payload_value('deleted_at', '2017-02-05T11:34:36.000000') \
.with_context_value(notification.NotificationMessage.RETRY_COUNTER, 3) \
.build()
self.assertTrue(self.filter.ignore_notification(message))
def test_when_message_was_never_retried(self):
message = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('state', 'deleted') \
.with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \
.with_payload_value('deleted_at', '2017-02-05T11:34:36.000000') \
.build()
self.assertFalse(self.filter.ignore_notification(message))
def test_with_instance_deleted_later(self):
message = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('state', 'deleted') \
.with_payload_value('created_at', '2017-02-05 11:25:01+00:00') \
.with_payload_value('deleted_at', '2017-02-05T11:44:36.000000') \
.with_context_value(notification.NotificationMessage.RETRY_COUNTER, 3) \
.build()
self.assertFalse(self.filter.ignore_notification(message))

View File

@ -0,0 +1,41 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from almanach.collector.filters import errored_instance_filter
from almanach.tests.unit import base
from almanach.tests.unit.builders import notification as builder
class TestErroredInstanceFilter(base.BaseTestCase):
def setUp(self):
super(TestErroredInstanceFilter, self).setUp()
self.filter = errored_instance_filter.ErroredInstanceFilter()
def test_instance_in_error_deleted(self):
notification = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('state', 'error') \
.build()
self.assertTrue(self.filter.ignore_notification(notification))
def test_active_instance_deleted(self):
notification = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('state', 'active') \
.build()
self.assertFalse(self.filter.ignore_notification(notification))

View File

@ -25,7 +25,9 @@ class TestInstanceHandler(base.BaseTestCase):
def setUp(self):
super(TestInstanceHandler, self).setUp()
self.controller = mock.Mock()
self.instance_handler = instance_handler.InstanceHandler(self.controller)
self.on_delete_filter = mock.Mock()
self.on_delete_filter.ignore_notification.return_value = False
self.instance_handler = instance_handler.InstanceHandler(self.controller, self.on_delete_filter)
def test_instance_created(self):
notification = builder.InstanceNotificationBuilder()\
@ -77,6 +79,8 @@ class TestInstanceHandler(base.BaseTestCase):
)
def test_instance_in_error_deleted(self):
self.on_delete_filter.ignore_notification.return_value = True
notification = builder.InstanceNotificationBuilder() \
.with_event_type('compute.instance.delete.end') \
.with_payload_value('terminated_at', 'a_date') \

View File

@ -0,0 +1,52 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from almanach.collector import notification
from almanach.tests.unit import base
class TestNotificationFilter(base.BaseTestCase):
def setUp(self):
super(TestNotificationFilter, self).setUp()
self.filter = notification.NotificationFilter()
def test_when_one_filter_returns_true(self):
notification = mock.Mock()
filter1 = mock.Mock()
filter2 = mock.Mock()
self.filter.add(filter1)
self.filter.add(filter2)
filter1.ignore_notification.return_value = False
filter2.ignore_notification.return_value = True
self.assertTrue(self.filter.ignore_notification(notification))
def test_when_all_filters_returns_false(self):
notification = mock.Mock()
filter1 = mock.Mock()
filter2 = mock.Mock()
self.filter.add(filter1)
self.filter.add(filter2)
filter1.ignore_notification.return_value = False
filter2.ignore_notification.return_value = False
self.assertFalse(self.filter.ignore_notification(notification))

View File

@ -18,10 +18,10 @@ from almanach.collector import notification
from almanach.tests.unit import base
class TestNotification(base.BaseTestCase):
class TestNotificationHandler(base.BaseTestCase):
def setUp(self):
super(TestNotification, self).setUp()
super(TestNotificationHandler, self).setUp()
self.config_fixture.config(retry_delay=0, group='collector')
self.config_fixture.config(max_retries=3, group='collector')

View File

@ -14,7 +14,12 @@
import mock
from almanach.collector.handlers import instance_handler
from almanach.collector.handlers import volume_handler
from almanach.collector.handlers import volume_type_handler
from almanach.collector import notification
from almanach.collector import service
from almanach.tests.unit import base
@ -32,3 +37,19 @@ class TestServiceFactory(base.BaseTestCase):
self.core_factory.get_instance_controller.assert_called_once()
self.core_factory.get_volume_controller.assert_called_once()
self.core_factory.get_volume_type_controller.assert_called_once()
def test_get_instance_handler(self):
self.assertIsInstance(self.factory.get_instance_handler(),
instance_handler.InstanceHandler)
def test_get_volume_handler(self):
self.assertIsInstance(self.factory.get_volume_handler(),
volume_handler.VolumeHandler)
def test_get_volume_type_handler(self):
self.assertIsInstance(self.factory.get_volume_type_handler(),
volume_type_handler.VolumeTypeHandler)
def test_get_on_delete_filters(self):
self.assertIsInstance(self.factory.get_on_delete_filters(),
notification.NotificationFilter)

View File

@ -0,0 +1,65 @@
# Copyright 2017 Internap.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import pytz
from almanach.core.helpers import date_helper
from almanach.tests.unit import base
class TestDateHelper(base.BaseTestCase):
def setUp(self):
super(TestDateHelper, self).setUp()
self.helper = date_helper.DateHelper()
def test_parser_with_datetime_object(self):
original_datetime = datetime.now(tz=pytz.utc)
d = self.helper.parse(original_datetime)
self.assertEqual(original_datetime, d)
def test_parser_created_at_string(self):
d = self.helper.parse('2017-02-15 05:22:57+00:00')
self.assertIsInstance(d, datetime)
self.assertEqual(d.day, 15)
self.assertEqual(d.month, 2)
self.assertEqual(d.year, 2017)
self.assertEqual(d.hour, 5)
self.assertEqual(d.minute, 22)
self.assertEqual(d.second, 57)
self.assertEqual(d.tzinfo, pytz.utc)
def test_parser_deleted_at_string(self):
d = self.helper.parse('2017-02-05T11:34:36.000000')
self.assertIsInstance(d, datetime)
self.assertEqual(d.day, 5)
self.assertEqual(d.month, 2)
self.assertEqual(d.year, 2017)
self.assertEqual(d.hour, 11)
self.assertEqual(d.minute, 34)
self.assertEqual(d.second, 36)
self.assertEqual(d.tzinfo, pytz.utc)
def test_is_whithin_range(self):
d1 = self.helper.parse('2017-02-15 05:22:57+00:00')
d2 = self.helper.parse('2017-02-15T05:30:04.000000')
self.assertFalse(self.helper.is_within_range(d1, d2, 30))
self.assertTrue(self.helper.is_within_range(d1, d2, 600))
d2 = self.helper.parse('2017-02-15 05:22:57+00:00')
d1 = self.helper.parse('2017-02-15T05:30:04.000000')
self.assertFalse(self.helper.is_within_range(d1, d2, 30))
self.assertTrue(self.helper.is_within_range(d1, d2, 600))