Use futurist library for asynchronous tasks

A green thread is now used instead of spawn_n for running asynchronous
operations during introspection, processing and aborting.
The existing periodic tasks are now run using Futurist PeriodicWorker.

Main shut down procedure was split into a separate function for convenience.
Also updated the example.conf to the latest versions (some pending updates from
3rdparty libraries included).

Change-Id: Id0efa31aee68a80ec55e4136c53189484b452559
This commit is contained in:
Dmitry Tantsur 2016-02-29 13:39:31 +01:00
parent dcb65fc5a9
commit 5b02024cca
13 changed files with 97 additions and 134 deletions

View File

@ -50,6 +50,7 @@
#ssl_key_path =
# The green thread pool size. (integer value)
# Minimum value: 2
#max_concurrency = 1000
# Delay (in seconds) between two introspections. (integer value)
@ -84,20 +85,13 @@
# The name of a logging configuration file. This file is appended to
# any existing logging configuration files. For details about logging
# configuration files, see the Python logging module documentation.
# Note that when logging configuration files are used all logging
# configuration is defined in the configuration file and other logging
# configuration options are ignored (for example, log_format). (string
# value)
# Note that when logging configuration files are used then all logging
# configuration is set in the configuration file and other logging
# configuration options are ignored (for example,
# logging_context_format_string). (string value)
# Deprecated group/name - [DEFAULT]/log_config
#log_config_append = <None>
# DEPRECATED. A logging.Formatter log message format string which may
# use any of the available logging.LogRecord attributes. This option
# is deprecated. Please use logging_context_format_string and
# logging_default_format_string instead. This option is ignored if
# log_config_append is set. (string value)
#log_format = <None>
# Defines the format string for %%(asctime)s in log records. Default:
# %(default)s . This option is ignored if log_config_append is set.
# (string value)
@ -126,15 +120,6 @@
# log_config_append is set. (boolean value)
#use_syslog = false
# Enables or disables syslog rfc5424 format for logging. If enabled,
# prefixes the MSG part of the syslog message with APP-NAME (RFC5424).
# The format without the APP-NAME is deprecated in Kilo, and will be
# removed in Mitaka, along with this option. This option is ignored if
# log_config_append is set. (boolean value)
# This option is deprecated for removal.
# Its value may be silently ignored in the future.
#use_syslog_rfc_format = true
# Syslog facility to receive log lines. This option is ignored if
# log_config_append is set. (string value)
#syslog_log_facility = LOG_USER
@ -164,7 +149,7 @@
# List of package logging levels in logger=LEVEL pairs. This option is
# ignored if log_config_append is set. (list value)
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN,oslo.cache=INFO,dogpile.core.dogpile=INFO
# Enables or disables publication of error events. (boolean value)
#publish_errors = false
@ -250,7 +235,7 @@
# value)
# Deprecated group/name - [DEFAULT]/sql_max_overflow
# Deprecated group/name - [DATABASE]/sqlalchemy_max_overflow
#max_overflow = <None>
#max_overflow = 50
# Verbosity of SQL debugging information: 0=None, 100=Everything.
# (integer value)

View File

@ -240,7 +240,7 @@ SERVICE_OPTS = [
default='',
help='Path to SSL key'),
cfg.IntOpt('max_concurrency',
default=1000,
default=1000, min=2,
help='The green thread pool size.'),
cfg.IntOpt('introspection_delay',
default=5,

View File

@ -100,9 +100,9 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
ironic=ironic)
node_info.set_option('new_ipmi_credentials', new_ipmi_credentials)
def _handle_exceptions():
def _handle_exceptions(fut):
try:
_background_introspect(ironic, node_info)
fut.result()
except utils.Error as exc:
# Logging has already happened in Error.__init__
node_info.finished(error=str(exc))
@ -111,7 +111,8 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
LOG.exception(msg, node_info=node_info)
node_info.finished(error=msg)
utils.spawn_n(_handle_exceptions)
future = utils.executor().submit(_background_introspect, ironic, node_info)
future.add_done_callback(_handle_exceptions)
def _background_introspect(ironic, node_info):
@ -196,7 +197,7 @@ def abort(uuid, token=None):
raise utils.Error(_('Node is locked, please, retry later'),
node_info=node_info, code=409)
utils.spawn_n(_abort, node_info, ironic)
utils.executor().submit(_abort, node_info, ironic)
def _abort(node_info, ironic):

View File

@ -21,6 +21,7 @@ import ssl
import sys
import flask
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
@ -286,26 +287,23 @@ def handle_404(error):
return error_response(error, code=404)
def periodic_update(period): # pragma: no cover
while True:
LOG.debug('Running periodic update of filters')
try:
@periodics.periodic(spacing=CONF.firewall.firewall_update_period,
enabled=CONF.firewall.manage_firewall)
def periodic_update(): # pragma: no cover
try:
firewall.update_filters()
except Exception:
LOG.exception(_LE('Periodic update of firewall rules failed'))
@periodics.periodic(spacing=CONF.clean_up_period)
def periodic_clean_up(): # pragma: no cover
try:
if node_cache.clean_up():
firewall.update_filters()
except Exception:
LOG.exception(_LE('Periodic update failed'))
eventlet.greenthread.sleep(period)
def periodic_clean_up(period): # pragma: no cover
while True:
LOG.debug('Running periodic clean up of node cache')
try:
if node_cache.clean_up():
firewall.update_filters()
sync_with_ironic()
except Exception:
LOG.exception(_LE('Periodic clean up of node cache failed'))
eventlet.greenthread.sleep(period)
sync_with_ironic()
except Exception:
LOG.exception(_LE('Periodic clean up of node cache failed'))
def sync_with_ironic():
@ -316,7 +314,12 @@ def sync_with_ironic():
node_cache.delete_nodes_not_in_list(ironic_node_uuids)
_PERIODICS_WORKER = None
def init():
global _PERIODICS_WORKER
if utils.get_auth_strategy() != 'noauth':
utils.add_auth_middleware(app)
else:
@ -344,14 +347,29 @@ def init():
if CONF.firewall.manage_firewall:
firewall.init()
period = CONF.firewall.firewall_update_period
utils.spawn_n(periodic_update, period)
if CONF.timeout > 0:
period = CONF.clean_up_period
utils.spawn_n(periodic_clean_up, period)
else:
LOG.warning(_LW('Timeout is disabled in configuration'))
_PERIODICS_WORKER = periodics.PeriodicWorker(
callables=[(periodic_update, None, None),
(periodic_clean_up, None, None)],
executor_factory=periodics.ExistingExecutor(utils.executor()))
utils.executor().submit(_PERIODICS_WORKER.start)
def shutdown():
global _PERIODICS_WORKER
LOG.debug('Shutting down')
firewall.clean_up()
if _PERIODICS_WORKER is not None:
_PERIODICS_WORKER.stop()
_PERIODICS_WORKER.wait()
_PERIODICS_WORKER = None
if utils.executor().alive:
utils.executor().shutdown(wait=True)
LOG.info(_LI('Shut down successfully'))
def create_ssl_context():
@ -416,4 +434,4 @@ def main(args=sys.argv[1:]): # pragma: no cover
try:
app.run(**app_kwargs)
finally:
firewall.clean_up()
shutdown()

View File

@ -177,14 +177,14 @@ def _process_node(node, introspection_data, node_info):
if node_info.options.get('new_ipmi_credentials'):
new_username, new_password = (
node_info.options.get('new_ipmi_credentials'))
utils.spawn_n(_finish_set_ipmi_credentials,
ironic, node, node_info, introspection_data,
new_username, new_password)
utils.executor().submit(_finish_set_ipmi_credentials,
ironic, node, node_info, introspection_data,
new_username, new_password)
resp['ipmi_setup_credentials'] = True
resp['ipmi_username'] = new_username
resp['ipmi_password'] = new_password
else:
utils.spawn_n(_finish, ironic, node_info, introspection_data)
utils.executor().submit(_finish, ironic, node_info, introspection_data)
return resp

View File

@ -13,6 +13,7 @@
import unittest
import futurist
import mock
from oslo_concurrency import lockutils
from oslo_config import cfg
@ -25,6 +26,7 @@ from ironic_inspector import conf # noqa
from ironic_inspector import db
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector import utils
CONF = cfg.CONF
@ -65,6 +67,7 @@ class BaseTest(unittest.TestCase):
patch.start()
# 'p=patch' magic is due to how closures work
self.addCleanup(lambda p=patch: p.stop())
utils._EXECUTOR = futurist.SynchronousExecutor(green=True)
def assertPatchEqual(self, expected, actual):
expected = sorted(expected, key=lambda p: p['path'])

View File

@ -49,8 +49,6 @@ class BaseTest(test_base.NodeTest):
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
@mock.patch.object(utils, 'spawn_n',
lambda f, *a, **kw: f(*a, **kw) and None)
@mock.patch.object(firewall, 'update_filters', autospec=True)
@mock.patch.object(node_cache, 'add_node', autospec=True)
@mock.patch.object(ir_utils, 'get_client', autospec=True)
@ -334,8 +332,6 @@ class TestIntrospect(BaseTest):
self.assertEqual(42, introspect._LAST_INTROSPECTION_TIME)
@mock.patch.object(utils, 'spawn_n',
lambda f, *a, **kw: f(*a, **kw) and None)
@mock.patch.object(firewall, 'update_filters', autospec=True)
@mock.patch.object(node_cache, 'add_node', autospec=True)
@mock.patch.object(ir_utils, 'get_client', autospec=True)
@ -419,8 +415,6 @@ class TestSetIpmiCredentials(BaseTest):
new_ipmi_credentials=self.new_creds)
@mock.patch.object(utils, 'spawn_n',
lambda f, *a, **kw: f(*a, **kw) and None)
@mock.patch.object(firewall, 'update_filters', autospec=True)
@mock.patch.object(node_cache, 'get_node', autospec=True)
@mock.patch.object(ir_utils, 'get_client', autospec=True)

View File

@ -457,31 +457,30 @@ class TestPlugins(unittest.TestCase):
plugins_base.processing_hooks_manager())
@mock.patch.object(utils, 'spawn_n')
@mock.patch.object(firewall, 'init')
@mock.patch.object(utils, 'add_auth_middleware')
@mock.patch.object(ir_utils, 'get_client')
@mock.patch.object(db, 'init')
class TestInit(test_base.BaseTest):
def setUp(self):
super(TestInit, self).setUp()
# Tests default to a synchronous executor which can't be used here
utils._EXECUTOR = None
@mock.patch.object(firewall, 'clean_up', lambda: None)
def tearDown(self):
main.shutdown()
def test_ok(self, mock_node_cache, mock_get_client, mock_auth,
mock_firewall, mock_spawn_n):
mock_firewall):
CONF.set_override('auth_strategy', 'keystone')
main.init()
mock_auth.assert_called_once_with(main.app)
mock_node_cache.assert_called_once_with()
mock_firewall.assert_called_once_with()
spawn_n_expected_args = [
(main.periodic_update, CONF.firewall.firewall_update_period),
(main.periodic_clean_up, CONF.clean_up_period)]
spawn_n_call_args_list = mock_spawn_n.call_args_list
for (args, call) in zip(spawn_n_expected_args,
spawn_n_call_args_list):
self.assertEqual(args, call[0])
def test_init_without_authenticate(self, mock_node_cache, mock_get_client,
mock_auth, mock_firewall, mock_spawn_n):
mock_auth, mock_firewall):
CONF.set_override('auth_strategy', 'noauth')
main.init()
self.assertFalse(mock_auth.called)
@ -489,7 +488,7 @@ class TestInit(test_base.BaseTest):
@mock.patch.object(main.LOG, 'warning')
def test_init_with_no_data_storage(self, mock_log, mock_node_cache,
mock_get_client, mock_auth,
mock_firewall, mock_spawn_n):
mock_firewall):
msg = ('Introspection data will not be stored. Change '
'"[processing] store_data" option if this is not the '
'desired behavior')
@ -499,7 +498,7 @@ class TestInit(test_base.BaseTest):
@mock.patch.object(main.LOG, 'info')
def test_init_with_swift_storage(self, mock_log, mock_node_cache,
mock_get_client, mock_auth,
mock_firewall, mock_spawn_n):
mock_firewall):
CONF.set_override('store_data', 'swift', 'processing')
msg = mock.call('Introspection data will be stored in Swift in the '
'container %s', CONF.swift.container)
@ -508,33 +507,15 @@ class TestInit(test_base.BaseTest):
def test_init_without_manage_firewall(self, mock_node_cache,
mock_get_client, mock_auth,
mock_firewall, mock_spawn_n):
mock_firewall):
CONF.set_override('manage_firewall', False, 'firewall')
main.init()
self.assertFalse(mock_firewall.called)
spawn_n_expected_args = [
(main.periodic_clean_up, CONF.clean_up_period)]
spawn_n_call_args_list = mock_spawn_n.call_args_list
for (args, call) in zip(spawn_n_expected_args,
spawn_n_call_args_list):
self.assertEqual(args, call[0])
def test_init_with_timeout_0(self, mock_node_cache, mock_get_client,
mock_auth, mock_firewall, mock_spawn_n):
CONF.set_override('timeout', 0)
main.init()
spawn_n_expected_args = [
(main.periodic_update, CONF.firewall.firewall_update_period)]
spawn_n_call_args_list = mock_spawn_n.call_args_list
for (args, call) in zip(spawn_n_expected_args,
spawn_n_call_args_list):
self.assertEqual(args, call[0])
@mock.patch.object(main.LOG, 'critical')
def test_init_failed_processing_hook(self, mock_log, mock_node_cache,
mock_get_client, mock_auth,
mock_firewall, mock_spawn_n):
mock_firewall):
CONF.set_override('processing_hooks', 'foo!', 'processing')
plugins_base._HOOKS_MGR = None

View File

@ -240,8 +240,6 @@ class TestProcess(BaseTest):
hook_mock.assert_called_once_with(self.data)
@mock.patch.object(utils, 'spawn_n',
lambda f, *a: f(*a) and None)
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
@mock.patch.object(example_plugin.ExampleProcessingHook, 'before_update')
@mock.patch.object(firewall, 'update_filters', autospec=True)
@ -371,8 +369,7 @@ class TestProcessNode(BaseTest):
self.node_info.set_option('new_ipmi_credentials', self.new_creds)
self.cli.node.get_boot_device.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(utils.Error, 'Failed to validate',
self.call)
self.call()
self.cli.node.update.assert_any_call(self.uuid, self.patch_credentials)
self.assertEqual(2, self.cli.node.update.call_count)
@ -389,8 +386,7 @@ class TestProcessNode(BaseTest):
post_hook_mock):
self.cli.node.set_power_state.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(utils.Error, 'Failed to power off',
self.call)
self.call()
self.cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
self.assertCalledWithPatch(self.patch_props, self.cli.node.update)

View File

@ -11,8 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from keystonemiddleware import auth_token
from oslo_config import cfg
@ -106,27 +104,6 @@ class TestCheckAuth(base.BaseTest):
utils.check_auth(request)
class TestSpawnN(unittest.TestCase):
def setUp(self):
super(TestSpawnN, self).setUp()
utils.GREEN_POOL = None
@mock.patch('eventlet.greenpool.GreenPool', autospec=True)
def test_spawn_n(self, mock_green_pool):
greenpool = mock_green_pool.return_value
func = lambda x: x
utils.spawn_n(func, "hello")
self.assertEqual(greenpool, utils.GREEN_POOL)
greenpool.spawn_n.assert_called_with(func, "hello")
utils.spawn_n(func, "goodbye")
greenpool.spawn_n.assert_called_with(func, "goodbye")
mock_green_pool.assert_called_once_with(CONF.max_concurrency)
class TestProcessingLogger(base.BaseTest):
def test_prefix_no_info(self):
self.assertEqual('[unidentified node]',

View File

@ -14,7 +14,7 @@
import logging as pylog
import re
import eventlet
import futurist
from keystonemiddleware import auth_token
from oslo_config import cfg
from oslo_log import log
@ -25,7 +25,7 @@ from ironic_inspector import conf # noqa
CONF = cfg.CONF
GREEN_POOL = None
_EXECUTOR = None
def get_ipmi_address_from_data(introspection_data):
@ -116,11 +116,13 @@ class NotFoundInCacheError(Error):
super(NotFoundInCacheError, self).__init__(msg, code)
def spawn_n(*args, **kwargs):
global GREEN_POOL
if not GREEN_POOL:
GREEN_POOL = eventlet.greenpool.GreenPool(CONF.max_concurrency)
return GREEN_POOL.spawn_n(*args, **kwargs)
def executor():
"""Return the current futures executor."""
global _EXECUTOR
if _EXECUTOR is None:
_EXECUTOR = futurist.GreenThreadPoolExecutor(
max_workers=CONF.max_concurrency)
return _EXECUTOR
def add_auth_middleware(app):

View File

@ -0,0 +1,5 @@
---
upgrade:
- Minimum possible value for the "max_concurrency" setting is now 2.
other:
- Switched to Futurist library for asynchronous tasks.

View File

@ -5,6 +5,7 @@ alembic>=0.8.0 # MIT
Babel>=1.3 # BSD
eventlet!=0.18.3,>=0.18.2 # MIT
Flask<1.0,>=0.10 # BSD
futurist>=0.11.0 # Apache-2.0
jsonpath-rw<2.0,>=1.2.0 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0