Implement _process_unfinished_notifications periodic tasks

Added _process_unfinished_notifications to process notifications
which are in error or new state. This periodic task will execute at
regular interval defined by new config option
'process_unfinished_notifications_interval' defaults to 120 seconds.

The notifications which are in ‘new’ status will be picked up based
on a new config option ‘retry_notification_new_status_interval’ defaults
to 60 seconds.

Implements: bp add-periodic-tasks
Change-Id: I6e607d83f04618ad695a9614f84ad690b8804848
This commit is contained in:
Abhishek Kekane 2016-11-25 10:48:49 +05:30
parent 7645ab489d
commit 6db17bc33a
5 changed files with 67 additions and 39 deletions

View File

@ -113,6 +113,7 @@ function configure_masakari {
# Set os_privileged_user credentials (used for connecting nova service)
iniset $MASAKARI_CONF DEFAULT os_privileged_user_name nova
iniset $MASAKARI_CONF DEFAULT os_privileged_user_auth_url "${KEYSTONE_AUTH_PROTOCOL}://${KEYSTONE_AUTH_HOST}/identity_admin"
iniset $MASAKARI_CONF DEFAULT os_privileged_user_password "$SERVICE_PASSWORD"
iniset $MASAKARI_CONF DEFAULT os_privileged_user_tenant "$SERVICE_PROJECT_NAME"
iniset $MASAKARI_CONF DEFAULT graceful_shutdown_timeout "$SERVICE_GRACEFUL_SHUTDOWN_TIMEOUT"

View File

@ -19,7 +19,6 @@ Handles all requests to Nova.
import functools
import sys
from keystoneauth1.access import service_catalog
from keystoneauth1 import exceptions as keystone_exception
import keystoneauth1.loading
import keystoneauth1.session
@ -88,17 +87,9 @@ def novaclient(context, timeout=None):
@param timeout: Number of seconds to wait for an answer before raising a
Timeout exception (None to disable)
"""
sc = context.service_catalog or []
nova_catalog_info = CONF.nova_catalog_admin_info
service_type, service_name, endpoint_type = nova_catalog_info.split(':')
# Extract the region if set in configuration
if CONF.os_region_name:
region_filter = {'region_name': CONF.os_region_name}
else:
region_filter = {}
context = ctx.RequestContext(
CONF.os_privileged_user_name, None,
auth_token=CONF.os_privileged_user_password,
@ -107,22 +98,7 @@ def novaclient(context, timeout=None):
# User needs to authenticate to Keystone before querying Nova, so we set
# auth_url to the identity service endpoint
if CONF.os_privileged_user_auth_url:
url = CONF.os_privileged_user_auth_url
else:
# We then pass region_name, endpoint_type, etc. to the
# Client() constructor so that the final endpoint is
# chosen correctly.
try:
url = service_catalog.ServiceCatalogV2(sc).url_for(
service_type='identity',
interface=endpoint_type,
**region_filter)
except keystone_exception.EndpointNotFound:
url = service_catalog.ServiceCatalogV3(sc).url_for(
service_type='identity',
interface=endpoint_type,
**region_filter)
url = CONF.os_privileged_user_auth_url
LOG.debug('Creating a Nova client using "%s" user',
CONF.os_privileged_user_name)

View File

@ -77,6 +77,18 @@ notification_opts = [
cfg.IntOpt('wait_period_after_power_on',
default=60,
help='Number of seconds to wait for instance to start'),
cfg.IntOpt('process_unfinished_notifications_interval',
default=120,
help='Interval in seconds for processing notifications which '
'are in error or new state.'),
cfg.IntOpt('retry_notification_new_status_interval',
default=60,
help="Interval in seconds for identifying notifications which "
"are in new state. If the notification is in new state "
"till this config option value after it's "
"generated_time, then it is considered that notification "
"is ignored by the messaging queue and will be processed "
"by 'process_unfinished_notifications' periodic task."),
]

View File

@ -24,11 +24,13 @@ workflows.
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from oslo_utils import timeutils
import masakari.conf
from masakari.engine import driver
from masakari import exception
from masakari.i18n import _LI, _LW
from masakari.i18n import _LE, _LI, _LW
from masakari import manager
from masakari import objects
from masakari.objects import fields
@ -152,8 +154,7 @@ class MasakariManager(manager.Manager):
return notification_status
def process_notification(self, context, notification=None):
"""Processes the notification"""
def _process_notification(self, context, notification):
@utils.synchronized(notification.source_host_uuid)
def do_process_notification(notification):
LOG.info(_LI('Processing notification %(notification_uuid)s of '
@ -191,3 +192,46 @@ class MasakariManager(manager.Manager):
notification.save()
do_process_notification(notification)
def process_notification(self, context, notification=None):
"""Processes the notification"""
self._process_notification(context, notification)
@periodic_task.periodic_task(
spacing=CONF.process_unfinished_notifications_interval)
def _process_unfinished_notifications(self, context):
filters = {
'status': [fields.NotificationStatus.ERROR,
fields.NotificationStatus.NEW]
}
notifications_list = objects.NotificationList.get_all(context,
filters=filters)
for notification in notifications_list:
if (notification.status == fields.NotificationStatus.ERROR or
(notification.status == fields.NotificationStatus.NEW and
timeutils.is_older_than(
notification.generated_time,
CONF.retry_notification_new_status_interval))):
self._process_notification(context, notification)
# get updated notification from db after workflow execution
notification_db = objects.Notification.get_by_uuid(
context, notification.notification_uuid)
if notification_db.status == fields.NotificationStatus.ERROR:
# update notification status as failed
notification_status = fields.NotificationStatus.FAILED
update_data = {
'status': notification_status
}
notification_db.update(update_data)
notification_db.save()
LOG.error(_LE(
"Periodic task 'process_unfinished_notifications': "
"Notification %(notification_uuid)s exits with "
"status: %(status)s."), {
'notification_uuid': notification.notification_uuid,
'status': notification_status
})

View File

@ -39,6 +39,8 @@ class NovaClientTestCase(test.TestCase):
self.override_config('os_privileged_user_name', 'adminuser')
self.override_config('os_privileged_user_password', 'strongpassword')
self.override_config('os_privileged_user_auth_url',
'http://keystonehost/identity_admin')
@mock.patch('novaclient.api_versions.APIVersion')
@mock.patch('novaclient.client.Client')
@ -48,7 +50,7 @@ class NovaClientTestCase(test.TestCase):
p_client, p_api_version):
nova.novaclient(self.ctx)
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
auth_url='http://keystonehost:5000/v2.0',
auth_url='http://keystonehost/identity_admin',
password='strongpassword', project_name=None, username='adminuser'
)
p_client.assert_called_once_with(
@ -65,7 +67,7 @@ class NovaClientTestCase(test.TestCase):
p_client, p_api_version):
nova.novaclient(self.ctx)
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
auth_url='http://keystonehost:5000/v2.0',
auth_url='http://keystonehost/identity_admin',
password='strongpassword', project_name=None, username='adminuser'
)
p_client.assert_called_once_with(
@ -82,11 +84,9 @@ class NovaClientTestCase(test.TestCase):
p_plugin_loader,
p_client,
p_api_version):
self.override_config('os_privileged_user_auth_url',
'http://privatekeystonehost:5000/v2.0')
nova.novaclient(self.ctx)
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
auth_url='http://privatekeystonehost:5000/v2.0',
auth_url='http://keystonehost/identity_admin',
password='strongpassword', project_name=None, username='adminuser'
)
p_client.assert_called_once_with(
@ -97,19 +97,14 @@ class NovaClientTestCase(test.TestCase):
@mock.patch('novaclient.api_versions.APIVersion')
@mock.patch('novaclient.client.Client')
@mock.patch('keystoneauth1.access.service_catalog.ServiceCatalogV2.'
'url_for')
@mock.patch('keystoneauth1.access.service_catalog.ServiceCatalogV3.'
'url_for')
@mock.patch('keystoneauth1.loading.get_plugin_loader')
@mock.patch('keystoneauth1.session.Session')
def test_nova_client_custom_region(self, p_session, p_plugin_loader,
p_catalogv3, p_catalogv2,
p_client, p_api_version):
self.override_config('os_region_name', 'farfaraway')
nova.novaclient(self.ctx)
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
auth_url=p_catalogv2() or p_catalogv3(),
auth_url='http://keystonehost/identity_admin',
password='strongpassword', project_name=None, username='adminuser'
)
p_client.assert_called_once_with(