check expired notifications

Occasionally, there would be notifications which will remain 'new',
'error' or 'running' status all times, and not to be processed again.
Due to this, operator can not update the segment or host.

This patch add one task to periodically check unfinished notifications.
If one unfinished notification is expired, just set its status to
'failed'.

Close-Bug: #1773765
Change-Id: If49635639dd976aeec3ea73e702ad2636fcf1e0a
This commit is contained in:
suzhengwei 2020-04-17 08:59:35 +08:00
parent 9a59610643
commit 3a4f782441
3 changed files with 51 additions and 2 deletions

View File

@ -97,6 +97,13 @@ notification_opts = [
"generated_time, then it is considered that notification "
"is ignored by the messaging queue and will be processed "
"by 'process_unfinished_notifications' periodic task."),
cfg.IntOpt('check_expired_notifications_interval',
default=600,
help='Interval in seconds for checking running notifications.'),
cfg.IntOpt('notifications_expired_interval',
default=86400,
help='Interval in seconds for identifying running '
'notifications expired.'),
cfg.IntOpt('host_failure_recovery_threads',
default=3,
min=1,

View File

@ -368,6 +368,34 @@ class MasakariManager(manager.Manager):
{'notification_uuid': notification.notification_uuid,
'status': notification_status})
@periodic_task.periodic_task(
spacing=CONF.check_expired_notifications_interval)
def _check_expired_notifications(self, context):
filters = {
'status': [fields.NotificationStatus.RUNNING,
fields.NotificationStatus.ERROR,
fields.NotificationStatus.NEW]
}
notifications_list = objects.NotificationList.get_all(context,
filters=filters)
for notification in notifications_list:
if timeutils.is_older_than(
notification.generated_time,
CONF.notifications_expired_interval):
# update running expired notification status as failed
notification_status = fields.NotificationStatus.FAILED
update_data = {
'status': notification_status
}
notification.update(update_data)
notification.save()
LOG.error(
"Periodic task 'check_expired_notifications': "
"Notification %(notification_uuid)s is expired.",
{'notification_uuid': notification.notification_uuid})
def get_notification_recovery_workflow_details(self, context,
notification):
"""Retrieve recovery workflow details of the notification"""

View File

@ -12,6 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from unittest import mock
from oslo_utils import importutils
@ -34,6 +36,8 @@ from masakari.tests import uuidsentinel
CONF = masakari.conf.CONF
NOW = timeutils.utcnow().replace(microsecond=0)
EXPIRED_TIME = timeutils.utcnow().replace(microsecond=0) \
- datetime.timedelta(seconds=CONF.notifications_expired_interval)
def _get_vm_type_notification(status="new"):
@ -69,14 +73,15 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
generated_time=NOW, status="new",
notification_uuid=uuidsentinel.fake_notification)
def _get_compute_host_type_notification(self):
def _get_compute_host_type_notification(self, expired=False):
return fakes.create_fake_notification(
type="COMPUTE_HOST", id=1, payload={
'event': 'stopped', 'host_status': 'NORMAL',
'cluster_status': 'ONLINE'
},
source_host_uuid=uuidsentinel.fake_host,
generated_time=NOW, status="new",
generated_time=EXPIRED_TIME if expired else NOW,
status="new",
notification_uuid=uuidsentinel.fake_notification)
@mock.patch("masakari.engine.drivers.taskflow."
@ -1150,3 +1155,12 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
mock_progress_details.assert_called_once_with(
self.context, notification)
@mock.patch.object(notification_obj.Notification, "save")
@mock.patch.object(notification_obj.NotificationList, "get_all")
def test_check_expired_notifications(self, mock_get_all, mock_save,
mock_notification_get):
notification = self._get_compute_host_type_notification(expired=True)
mock_get_all.return_value = [notification]
self.engine._check_expired_notifications(self.context)
self.assertEqual("failed", notification.status)