diff --git a/ironic_inspector/cmd/conductor.py b/ironic_inspector/cmd/conductor.py new file mode 100644 index 000000000..2f47e3285 --- /dev/null +++ b/ironic_inspector/cmd/conductor.py @@ -0,0 +1,42 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The Ironic Inspector Conductor service.""" + +import sys + +from oslo_config import cfg +from oslo_service import service + +from ironic_inspector.common.i18n import _ +from ironic_inspector.common.rpc_service import RPCService +from ironic_inspector.common import service_utils + +CONF = cfg.CONF + + +def main(args=sys.argv[1:]): + # Parse config file and command line options, then start logging + service_utils.prepare_service(args) + + if CONF.standalone: + msg = _('To run ironic-inspector-conductor, [DEFAULT]standalone ' + 'should be set to False.') + sys.exit(msg) + + launcher = service.ServiceLauncher(CONF, restart_method='mutate') + launcher.launch_service(RPCService(CONF.host)) + launcher.wait() + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/ironic_inspector/cmd/wsgi.py b/ironic_inspector/cmd/wsgi.py new file mode 100644 index 000000000..76e245285 --- /dev/null +++ b/ironic_inspector/cmd/wsgi.py @@ -0,0 +1,34 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""WSGI script for Ironic Inspector API, installed by pbr.""" + +import sys + +from oslo_config import cfg + +from ironic_inspector.common.i18n import _ +from ironic_inspector.common import service_utils +from ironic_inspector import main + +CONF = cfg.CONF + + +def initialize_wsgi_app(): + # Parse config file and command line options, then start logging + service_utils.prepare_service(sys.argv[1:]) + + if CONF.standalone: + msg = _('To run ironic-inspector-api, [DEFAULT]standalone should be ' + 'set to False.') + sys.exit(msg) + + return main.get_app() diff --git a/ironic_inspector/common/coordination.py b/ironic_inspector/common/coordination.py new file mode 100644 index 000000000..b163be4a9 --- /dev/null +++ b/ironic_inspector/common/coordination.py @@ -0,0 +1,137 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_log import log +from tooz import coordination + +from ironic_inspector import utils + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + +COORDINATION_PREFIX = 'ironic_inspector' +COORDINATION_GROUP_NAME = '.'.join([COORDINATION_PREFIX, 'service_group']) +LOCK_PREFIX = 'ironic_inspector.' + + +class Coordinator(object): + """Tooz coordination wrapper.""" + + group_name = COORDINATION_GROUP_NAME.encode('ascii') + lock_prefix = LOCK_PREFIX + + def __init__(self, prefix=None): + """Creates a coordinator instance for service coordination. + + :param prefix: The prefix to be part of the member id of the service. + Different types of services on the same host should use + different prefix to work properly. + """ + self.coordinator = None + self.started = False + self.prefix = prefix if prefix else 'default' + + def start(self, heartbeat=True): + """Start coordinator. + + :param heartbeat: Whether spawns a new thread to keep heartbeating with + the tooz backend. Unless there is periodic task to + do heartbeat manually, it should be always set to + True. + """ + if self.started: + return + + member_id = '.'.join([COORDINATION_PREFIX, self.prefix, + CONF.host]).encode('ascii') + self.coordinator = coordination.get_coordinator( + CONF.coordination.backend_url, member_id) + self.coordinator.start(start_heart=heartbeat) + self.started = True + LOG.debug('Coordinator started successfully.') + + def stop(self): + """Disconnect from coordination backend and stop heartbeat.""" + if self.started: + try: + self.coordinator.stop() + except Exception as e: + LOG.error('Failed to stop coordinator: %s', e) + self.coordinator = None + self.started = False + LOG.debug('Coordinator stopped successfully') + + def _validate_state(self): + if not self.started: + raise utils.Error('Coordinator should be started before ' + 'executing coordination actions.') + + def _create_group(self): + try: + request = self.coordinator.create_group(self.group_name) + request.get() + except coordination.GroupAlreadyExist: + LOG.debug('Group %s already exists.', self.group_name) + + def join_group(self): + """Join service group.""" + self._validate_state() + try: + request = self.coordinator.join_group(self.group_name) + request.get() + except coordination.GroupNotCreated: + self._create_group() + request = self.coordinator.join_group(self.group_name) + request.get() + except coordination.MemberAlreadyExist: + pass + LOG.debug('Joined group %s', self.group_name) + + def leave_group(self): + """Leave service group""" + self._validate_state() + try: + request = self.coordinator.leave_group(self.group_name) + request.get() + LOG.debug('Left group %s', self.group_name) + except coordination.MemberNotJoined: + LOG.debug('Leaving a non-existing group.') + + def get_members(self): + """Get members in the service group.""" + self._validate_state() + try: + result = self.coordinator.get_members(self.group_name) + return result.get() + except coordination.GroupNotCreated: + # If the group does not exist, there should be no members in it. + return set() + + def get_lock(self, uuid): + """Get lock for node uuid.""" + self._validate_state() + lock_name = (self.lock_prefix + uuid).encode('ascii') + return self.coordinator.get_lock(lock_name) + + +_COORDINATOR = None + + +@lockutils.synchronized('inspector_coordinator') +def get_coordinator(prefix=None): + global _COORDINATOR + if _COORDINATOR is None: + _COORDINATOR = Coordinator(prefix=prefix) + return _COORDINATOR diff --git a/ironic_inspector/common/locking.py b/ironic_inspector/common/locking.py index 3b9456971..9c973dd92 100644 --- a/ironic_inspector/common/locking.py +++ b/ironic_inspector/common/locking.py @@ -17,7 +17,7 @@ from oslo_concurrency import lockutils from oslo_config import cfg import six -from ironic_inspector import utils +from ironic_inspector.common import coordination CONF = cfg.CONF _LOCK_TEMPLATE = 'node-%s' @@ -70,11 +70,14 @@ class InternalLock(BaseLock): class ToozLock(BaseLock): - """Locking mechanism based on tooz.""" + """Wrapper on tooz locks.""" - def __init__(self, coordinator, uuid, prefix='ironic_inspector_'): - name = (prefix + uuid).encode() - self._lock = coordinator.get_lock(name) + def __init__(self, lock): + """Creates a wrapper on the tooz lock. + + :param lock: a tooz lock instance. + """ + self._lock = lock def acquire(self, blocking=True): if not self._lock.acquired: @@ -100,5 +103,6 @@ def get_lock(uuid): if CONF.standalone: return InternalLock(uuid) - coordinator = utils.get_coordinator() - return ToozLock(coordinator, uuid) + coordinator = coordination.get_coordinator() + lock = coordinator.get_lock(uuid) + return ToozLock(lock) diff --git a/ironic_inspector/common/rpc.py b/ironic_inspector/common/rpc.py index 94f49efad..3726531ef 100644 --- a/ironic_inspector/common/rpc.py +++ b/ironic_inspector/common/rpc.py @@ -30,10 +30,18 @@ def get_transport(): return TRANSPORT -def get_client(): - """Get a RPC client instance.""" - target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host, - version='1.2') +def get_client(topic=None): + """Get a RPC client instance. + + :param topic: The topic of the message will be delivered to. This argument + is ignored if CONF.standalone is True. + """ + if CONF.standalone: + target = messaging.Target(topic=manager.MANAGER_TOPIC, + server=CONF.host, + version='1.3') + else: + target = messaging.Target(topic=topic, version='1.3') transport = get_transport() return messaging.RPCClient(transport, target) @@ -43,7 +51,7 @@ def get_server(endpoints): transport = get_transport() target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host, - version='1.2') + version='1.3') return messaging.get_rpc_server( transport, target, endpoints, executor='eventlet', access_policy=dispatcher.DefaultRPCAccessPolicy) diff --git a/ironic_inspector/conductor/manager.py b/ironic_inspector/conductor/manager.py index d58965254..03c90aea2 100644 --- a/ironic_inspector/conductor/manager.py +++ b/ironic_inspector/conductor/manager.py @@ -23,7 +23,9 @@ from oslo_log import log import oslo_messaging as messaging from oslo_utils import excutils from oslo_utils import reflection +import tooz +from ironic_inspector.common import coordination from ironic_inspector.common.i18n import _ from ironic_inspector.common import ironic as ir_utils from ironic_inspector.common import keystone @@ -37,12 +39,12 @@ from ironic_inspector import utils LOG = log.getLogger(__name__) CONF = cfg.CONF -MANAGER_TOPIC = 'ironic-inspector-conductor' +MANAGER_TOPIC = 'ironic_inspector.conductor' class ConductorManager(object): """ironic inspector conductor manager""" - RPC_API_VERSION = '1.2' + RPC_API_VERSION = '1.3' target = messaging.Target(version=RPC_API_VERSION) @@ -98,9 +100,10 @@ class ConductorManager(object): if not CONF.standalone: try: - coordinator = utils.get_coordinator() - coordinator.start() - except Exception: + coordinator = coordination.get_coordinator(prefix='conductor') + coordinator.start(heartbeat=True) + coordinator.join_group() + except tooz.ToozError: with excutils.save_and_reraise_exception(): LOG.critical('Failed when connecting to coordination ' 'backend.') @@ -109,6 +112,16 @@ class ConductorManager(object): LOG.info('Successfully connected to coordination backend.') def del_host(self): + """Shutdown the ironic inspector conductor service.""" + + if not CONF.standalone: + try: + coordinator = coordination.get_coordinator(prefix='conductor') + if coordinator.started: + coordinator.leave_group() + coordinator.stop() + except tooz.ToozError: + LOG.exception('Failed to stop coordinator') if not self._shutting_down.acquire(blocking=False): LOG.warning('Attempted to shut down while already shutting down') @@ -133,14 +146,6 @@ class ConductorManager(object): self._shutting_down.release() - if not CONF.standalone: - try: - coordinator = utils.get_coordinator() - if coordinator and coordinator.is_started: - coordinator.stop() - except Exception: - LOG.exception('Failed to stop coordinator') - LOG.info('Shut down successfully') def _periodics_watchdog(self, callable_, activity, spacing, exc_info, @@ -177,6 +182,10 @@ class ConductorManager(object): process.reapply(node_uuid, data=data) + @messaging.expected_exceptions(utils.Error) + def do_continue(self, context, data): + return process.process(data) + def periodic_clean_up(): # pragma: no cover try: diff --git a/ironic_inspector/main.py b/ironic_inspector/main.py index 63de7f4f3..0b9d18d70 100644 --- a/ironic_inspector/main.py +++ b/ironic_inspector/main.py @@ -12,6 +12,7 @@ # limitations under the License. import os +import random import re import flask @@ -21,9 +22,11 @@ import six from ironic_inspector import api_tools from ironic_inspector.common import context +from ironic_inspector.common import coordination from ironic_inspector.common.i18n import _ from ironic_inspector.common import ironic as ir_utils from ironic_inspector.common import rpc +from ironic_inspector.conductor import manager import ironic_inspector.conf from ironic_inspector.conf import opts as conf_opts from ironic_inspector import node_cache @@ -34,7 +37,7 @@ from ironic_inspector import utils CONF = ironic_inspector.conf.CONF -app = flask.Flask(__name__) +_app = flask.Flask(__name__) LOG = utils.getProcessingLogger(__name__) MINIMUM_API_VERSION = (1, 0) @@ -43,6 +46,55 @@ DEFAULT_API_VERSION = CURRENT_API_VERSION _LOGGING_EXCLUDED_KEYS = ('logs',) +def _init_middleware(): + """Initialize WSGI middleware. + + :returns: None + """ + if CONF.auth_strategy != 'noauth': + utils.add_auth_middleware(_app) + else: + LOG.warning('Starting unauthenticated, please check' + ' configuration') + utils.add_cors_middleware(_app) + + +def get_app(): + """Get the flask instance.""" + _init_middleware() + return _app + + +# TODO(kaifeng) Extract rpc related code into a rpcapi module +def get_random_topic(): + coordinator = coordination.get_coordinator(prefix='api') + members = coordinator.get_members() + hosts = [] + for member in members: + # NOTE(kaifeng) recomposite host in case it contains '.' + parts = member.decode('ascii').split('.') + if len(parts) < 3: + LOG.warning('Found invalid member %s', member) + continue + + if parts[1] == 'conductor': + hosts.append('.'.join(parts[2:])) + + if not hosts: + raise utils.NoAvailableConductor('No available conductor service') + + topic = '%s.%s' % (manager.MANAGER_TOPIC, random.choice(hosts)) + return topic + + +def get_client_compat(): + if CONF.standalone: + return rpc.get_client() + + topic = get_random_topic() + return rpc.get_client(topic) + + def _get_version(): ver = flask.request.headers.get(conf_opts.VERSION_HEADER, _DEFAULT_API_VERSION) @@ -88,7 +140,16 @@ def convert_exceptions(func): return wrapper -@app.before_request +@_app.before_first_request +def start_coordinator(): + """Create a coordinator instance for non-standalone case.""" + if not CONF.standalone: + coordinator = coordination.get_coordinator(prefix='api') + coordinator.start(heartbeat=False) + LOG.info('Sucessfully created coordinator.') + + +@_app.before_request def check_api_version(): requested = _get_version() @@ -101,7 +162,7 @@ def check_api_version(): code=406) -@app.after_request +@_app.after_request def add_version_headers(res): res.headers[conf_opts.MIN_VERSION_HEADER] = '%s.%s' % MINIMUM_API_VERSION res.headers[conf_opts.MAX_VERSION_HEADER] = '%s.%s' % CURRENT_API_VERSION @@ -166,7 +227,7 @@ def api(path, is_public_api=False, rule=None, verb_to_rule_map=None, :param kwargs: all the rest kwargs are passed to flask app.route """ def outer(func): - @app.route(path, **flask_kwargs) + @_app.route(path, **flask_kwargs) @convert_exceptions @six.wraps(func) def wrapper(*args, **kwargs): @@ -206,7 +267,7 @@ def version_root(version): pat = re.compile(r'^\/%s\/[^\/]*?$' % version) resources = [] - for url in app.url_map.iter_rules(): + for url in _app.url_map.iter_rules(): if pat.match(str(url)): resources.append(url) @@ -229,7 +290,9 @@ def api_continue(): LOG.debug("Received data from the ramdisk: %s", logged_data, data=data) - return flask.jsonify(process.process(data)) + client = get_client_compat() + result = client.call({}, 'do_continue', data=data) + return flask.jsonify(result) # TODO(sambetts) Add API discovery for this endpoint @@ -253,7 +316,7 @@ def api_introspection(node_id): 'installation cannot manage boot (' '(can_manage_boot set to False)'), code=400) - client = rpc.get_client() + client = get_client_compat() client.call({}, 'do_introspection', node_id=node_id, manage_boot=manage_boot, token=flask.request.headers.get('X-Auth-Token')) @@ -279,7 +342,7 @@ def api_introspection_statuses(): @api('/v1/introspection//abort', rule="introspection:abort", methods=['POST']) def api_introspection_abort(node_id): - client = rpc.get_client() + client = get_client_compat() client.call({}, 'do_abort', node_id=node_id, token=flask.request.headers.get('X-Auth-Token')) return '', 202 @@ -321,7 +384,7 @@ def api_introspection_reapply(node_id): node = ir_utils.get_node(node_id, fields=['uuid']) node_id = node.uuid - client = rpc.get_client() + client = get_client_compat() client.call({}, 'do_reapply', node_uuid=node_id, data=data) return '', 202 @@ -374,6 +437,6 @@ def api_rule(uuid): return '', 204 -@app.errorhandler(404) +@_app.errorhandler(404) def handle_404(error): return error_response(error, code=404) diff --git a/ironic_inspector/test/unit/test_coordination.py b/ironic_inspector/test/unit/test_coordination.py new file mode 100644 index 000000000..768c323a3 --- /dev/null +++ b/ironic_inspector/test/unit/test_coordination.py @@ -0,0 +1,123 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import fixtures +import mock +from oslo_config import cfg +import tooz + +from ironic_inspector.common import coordination +from ironic_inspector.test import base +from ironic_inspector import utils + +CONF = cfg.CONF + + +@mock.patch.object(coordination, 'Coordinator', autospec=True) +class TestGetCoordinator(base.BaseTest): + def setUp(self): + super(TestGetCoordinator, self).setUp() + coordination._COORDINATOR = None + + def test_get(self, mock_coordinator): + + coordination.get_coordinator() + mock_coordinator.assert_called_once_with(prefix=None) + + def test_get_with_prefix(self, mock_coordinator): + coordination.get_coordinator(prefix='conductor') + mock_coordinator.assert_called_once_with(prefix='conductor') + + +class TestCoordinator(base.BaseTest): + def setUp(self): + super(TestCoordinator, self).setUp() + self.coordinator = coordination.Coordinator(prefix='test') + self.mock_driver = self.useFixture( + fixtures.MockPatchObject(tooz.coordination, 'CoordinationDriver', + autospec=True)).mock + self.mock_get_coordinator = self.useFixture( + fixtures.MockPatchObject(tooz.coordination, 'get_coordinator', + autospec=True)).mock + self.mock_get_coordinator.return_value = self.mock_driver + self.group_name = coordination.COORDINATION_GROUP_NAME.encode('ascii') + + def test_start(self): + CONF.set_override('backend_url', 'memcached://1.2.3.4:11211', + 'coordination') + CONF.set_override('host', '1.2.3.5') + self.coordinator.start() + self.mock_get_coordinator.assert_called_once_with( + 'memcached://1.2.3.4:11211', b'ironic_inspector.test.1.2.3.5') + self.assertTrue(self.coordinator.started) + self.mock_driver.start.assert_called_once_with(start_heart=True) + + def test_stop(self): + self.coordinator.started = True + self.coordinator.coordinator = mock.MagicMock() + self.coordinator.stop() + self.assertFalse(self.coordinator.started) + + def test__create_group(self): + self.coordinator.start() + self.coordinator._create_group() + self.mock_driver.create_group.assert_called_once_with(self.group_name) + + def test_join_group(self): + self.coordinator.start() + self.coordinator.join_group() + self.mock_driver.join_group.assert_called_once_with(self.group_name) + + def test_join_group_not_exist(self): + self.coordinator.start() + self.mock_driver.join_group.side_effect = [ + tooz.coordination.GroupNotCreated('a group'), mock.Mock()] + self.coordinator.join_group() + self.mock_driver.create_group.assert_called_once_with(self.group_name) + self.mock_driver.join_group.assert_has_calls([ + mock.call(self.group_name), mock.call(self.group_name)]) + + def test_leave_group(self): + self.coordinator.start() + self.coordinator.leave_group() + self.mock_driver.leave_group.assert_called_once_with(self.group_name) + + def test_get_members(self): + self.coordinator.start() + mock_resp = mock.Mock() + mock_resp.get.return_value = {'host1', 'host2'} + self.mock_driver.get_members.return_value = mock_resp + members = self.coordinator.get_members() + self.assertEqual(members, {'host1', 'host2'}) + self.mock_driver.get_members.assert_called_once_with(self.group_name) + + def test_get_members_no_such_group(self): + self.coordinator.start() + self.mock_driver.get_members.side_effect = ( + tooz.coordination.GroupNotCreated('a group')) + self.assertEqual(self.coordinator.get_members(), set()) + + def test_get_lock(self): + self.coordinator.start() + self.coordinator.get_lock('fake-node') + self.mock_driver.get_lock.assert_called_once_with( + b'ironic_inspector.fake-node') + + def test_invalid_state(self): + self.assertRaisesRegex(utils.Error, 'Coordinator should be started', + self.coordinator.join_group) + self.assertRaisesRegex(utils.Error, 'Coordinator should be started', + self.coordinator.leave_group) + self.assertRaisesRegex(utils.Error, 'Coordinator should be started', + self.coordinator.get_members) + self.assertRaisesRegex(utils.Error, 'Coordinator should be started', + self.coordinator.get_lock, 'fake id') diff --git a/ironic_inspector/test/unit/test_locking.py b/ironic_inspector/test/unit/test_locking.py index 38de30ced..a9148ee9c 100644 --- a/ironic_inspector/test/unit/test_locking.py +++ b/ironic_inspector/test/unit/test_locking.py @@ -14,9 +14,9 @@ import mock from oslo_config import cfg +from ironic_inspector.common import coordination from ironic_inspector.common import locking from ironic_inspector.test import base as test_base -from ironic_inspector import utils CONF = cfg.CONF @@ -30,15 +30,17 @@ class TestGetLock(test_base.NodeTest): mock_internal.assert_called_once_with(self.node.uuid) mock_tooz.assert_not_called() - @mock.patch.object(utils, 'get_coordinator', autospec=True) + @mock.patch.object(coordination, 'get_coordinator', autospec=True) def test_get_lock_tooz(self, mock_get_coord, mock_tooz, mock_internal): CONF.set_override('standalone', False) - coordinator = mock.MagicMock() + mock_lock = mock.Mock() + coordinator = mock.Mock() + coordinator.get_lock.return_value = mock_lock mock_get_coord.return_value = coordinator locking.get_lock(self.node.uuid) - mock_tooz.assert_called_once_with(coordinator, self.node.uuid) + mock_tooz.assert_called_once_with(mock_lock) mock_internal.assert_not_called() @@ -61,8 +63,7 @@ class TestInternalLock(test_base.NodeTest): self.mock_lock.acquire.assert_called_once_with(blocking=True) def test_release(self): - lock = locking.ToozLock(mock.MagicMock(), self.node.uuid) - lock._lock = self.mock_lock + lock = locking.ToozLock(self.mock_lock) self.mock_lock._locked = True lock.release() @@ -86,32 +87,21 @@ class TestToozLock(test_base.NodeTest): self.mock_lock.acquire.return_value = True self.mock_lock.acquired = False - def test_lock_default_prefix(self): - mock_coordinator = mock.MagicMock() - locking.ToozLock(mock_coordinator, self.node.uuid) - mock_coordinator.get_lock.assert_called_once_with( - str.encode('ironic_inspector_%s' % self.node.uuid)) - def test_acquire(self): - lock = locking.ToozLock(mock.MagicMock(), self.node.uuid) - lock._lock = self.mock_lock - + lock = locking.ToozLock(self.mock_lock) lock.acquire() - self.mock_lock.acquire.assert_called_once_with(blocking=True) def test_release(self): self.mock_lock.acquired = True - lock = locking.ToozLock(mock.MagicMock(), self.node.uuid) - lock._lock = self.mock_lock + lock = locking.ToozLock(self.mock_lock) lock.release() self.mock_lock.release.assert_called_once_with() def test_context(self): - lock = locking.ToozLock(mock.MagicMock(), self.node.uuid) - lock._lock = self.mock_lock + lock = locking.ToozLock(self.mock_lock) with lock: self.mock_lock.acquire.assert_called_once_with() diff --git a/ironic_inspector/test/unit/test_main.py b/ironic_inspector/test/unit/test_main.py index ce0a50337..d9503d37c 100644 --- a/ironic_inspector/test/unit/test_main.py +++ b/ironic_inspector/test/unit/test_main.py @@ -20,6 +20,7 @@ import mock import oslo_messaging as messaging from oslo_utils import uuidutils +from ironic_inspector.common import coordination from ironic_inspector.common import ironic as ir_utils from ironic_inspector.common import rpc from ironic_inspector.common import swift @@ -31,7 +32,6 @@ from ironic_inspector import node_cache from ironic_inspector.plugins import base as plugins_base from ironic_inspector.plugins import example as example_plugin from ironic_inspector.plugins import introspection_data as intros_data_plugin -from ironic_inspector import process from ironic_inspector import rules from ironic_inspector.test import base as test_base from ironic_inspector import utils @@ -46,20 +46,17 @@ def _get_error(res): class BaseAPITest(test_base.BaseTest): def setUp(self): super(BaseAPITest, self).setUp() - main.app.config['TESTING'] = True - self.app = main.app.test_client() + main._app.config['TESTING'] = True + self.app = main._app.test_client() CONF.set_override('auth_strategy', 'noauth') self.uuid = uuidutils.generate_uuid() - - -class TestApiIntrospect(BaseAPITest): - def setUp(self): - super(TestApiIntrospect, self).setUp() self.rpc_get_client_mock = self.useFixture( fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock self.client_mock = mock.MagicMock(spec=messaging.RPCClient) self.rpc_get_client_mock.return_value = self.client_mock + +class TestApiIntrospect(BaseAPITest): def test_introspect_no_authentication(self): CONF.set_override('auth_strategy', 'noauth') @@ -125,40 +122,34 @@ class TestApiIntrospect(BaseAPITest): self.assertFalse(self.client_mock.call.called) -@mock.patch.object(process, 'process', autospec=True) class TestApiContinue(BaseAPITest): - def test_continue(self, process_mock): + def test_continue(self): # should be ignored CONF.set_override('auth_strategy', 'keystone') - process_mock.return_value = {'result': 42} + self.client_mock.call.return_value = {'result': 42} res = self.app.post('/v1/continue', data='{"foo": "bar"}') self.assertEqual(200, res.status_code) - process_mock.assert_called_once_with({"foo": "bar"}) + self.client_mock.call.assert_called_once_with({}, 'do_continue', + data={"foo": "bar"}) self.assertEqual({"result": 42}, json.loads(res.data.decode())) - def test_continue_failed(self, process_mock): - process_mock.side_effect = utils.Error("boom") + def test_continue_failed(self): + self.client_mock.call.side_effect = utils.Error("boom") res = self.app.post('/v1/continue', data='{"foo": "bar"}') self.assertEqual(400, res.status_code) - process_mock.assert_called_once_with({"foo": "bar"}) + self.client_mock.call.assert_called_once_with({}, 'do_continue', + data={"foo": "bar"}) self.assertEqual('boom', _get_error(res)) - def test_continue_wrong_type(self, process_mock): + def test_continue_wrong_type(self): res = self.app.post('/v1/continue', data='42') self.assertEqual(400, res.status_code) self.assertEqual('Invalid data: expected a JSON object, got int', _get_error(res)) - self.assertFalse(process_mock.called) + self.client_mock.call.assert_not_called() class TestApiAbort(BaseAPITest): - def setUp(self): - super(TestApiAbort, self).setUp() - self.rpc_get_client_mock = self.useFixture( - fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock - self.client_mock = mock.MagicMock(spec=messaging.RPCClient) - self.rpc_get_client_mock.return_value = self.client_mock - def test_ok(self): res = self.app.post('/v1/introspection/%s/abort' % self.uuid, @@ -360,10 +351,6 @@ class TestApiReapply(BaseAPITest): def setUp(self): super(TestApiReapply, self).setUp() - self.rpc_get_client_mock = self.useFixture( - fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock - self.client_mock = mock.MagicMock(spec=messaging.RPCClient) - self.rpc_get_client_mock.return_value = self.client_mock CONF.set_override('store_data', 'swift', 'processing') def test_api_ok(self): @@ -590,7 +577,7 @@ class TestApiVersions(BaseAPITest): }]} self.assertEqual(expected, json_data) - @mock.patch.object(main.app.url_map, "iter_rules", autospec=True) + @mock.patch.object(main._app.url_map, "iter_rules", autospec=True) def test_version_endpoint(self, mock_rules): mock_rules.return_value = ["/v1/endpoint1", "/v1/endpoint2/", "/v1/endpoint1/", @@ -626,11 +613,8 @@ class TestApiVersions(BaseAPITest): # API version on unknown pages self._check_version_present(self.app.get('/v1/foobar')) - @mock.patch.object(rpc, 'get_client', autospec=True) @mock.patch.object(node_cache, 'get_node', autospec=True) - def test_usual_requests(self, get_mock, rpc_mock): - client_mock = mock.MagicMock(spec=messaging.RPCClient) - rpc_mock.return_value = client_mock + def test_usual_requests(self, get_mock): get_mock.return_value = node_cache.NodeInfo(uuid=self.uuid, started_at=42.0) # Successful @@ -681,3 +665,71 @@ class TestPlugins(unittest.TestCase): def test_manager_is_cached(self): self.assertIs(plugins_base.processing_hooks_manager(), plugins_base.processing_hooks_manager()) + + +class TestTopic(test_base.BaseTest): + def setUp(self): + super(TestTopic, self).setUp() + self.transport_mock = self.useFixture( + fixtures.MockPatchObject(rpc, 'get_transport', + autospec=True)).mock + self.target_mock = self.useFixture( + fixtures.MockPatchObject(rpc.messaging, 'Target', + autospec=True)).mock + self.rpcclient_mock = self.useFixture( + fixtures.MockPatchObject(rpc.messaging, 'RPCClient', + autospec=True)).mock + CONF.set_override('host', 'a-host') + + def test_get_client_compat_standalone(self): + main.get_client_compat() + self.target_mock.assert_called_with(topic='ironic_inspector.conductor', + server='a-host', version=mock.ANY) + + @mock.patch.object(main, 'get_random_topic', autospec=True) + def test_get_client_compat_non_standalone(self, mock_get_topic): + CONF.set_override('host', 'a-host') + CONF.set_override('standalone', False) + mock_get_topic.return_value = 'hello' + main.get_client_compat() + self.target_mock.assert_called_with( + topic='hello', version=mock.ANY) + + @mock.patch.object(coordination, 'get_coordinator') + def test_get_random_topic(self, mock_get_coordinator): + mock_coordinator = mock.Mock(spec=['get_members']) + members = [('ironic_inspector.conductor.host%s' % i).encode('ascii') + for i in range(5)] + topics = [('ironic_inspector.conductor.host%s' % i) for i in range(5)] + mock_coordinator.get_members.return_value = set(members) + mock_get_coordinator.return_value = mock_coordinator + for i in range(10): + topic = main.get_random_topic() + self.assertIn(topic, topics) + + @mock.patch.object(coordination, 'get_coordinator') + def test_get_random_topic_host_with_domain(self, mock_get_coordinator): + mock_coordinator = mock.Mock(spec=['get_members']) + members = ['ironic_inspector.conductor.' + 'local.domain'.encode('ascii')] + mock_coordinator.get_members.return_value = set(members) + mock_get_coordinator.return_value = mock_coordinator + topic = main.get_random_topic() + self.assertEqual(topic, 'ironic_inspector.conductor.local.domain') + + @mock.patch.object(coordination, 'get_coordinator') + def test_get_random_topic_host_bypass_invalid(self, mock_get_coordinator): + mock_coordinator = mock.Mock(spec=['get_members']) + members = ['this_should_not_happen'.encode('ascii')] + mock_coordinator.get_members.return_value = set(members) + mock_get_coordinator.return_value = mock_coordinator + self.assertRaisesRegex(utils.NoAvailableConductor, + 'No available conductor', + main.get_random_topic) + + @mock.patch.object(coordination, 'get_coordinator') + def test_get_random_topic_no_member(self, mock_get_coordinator): + mock_coordinator = mock.Mock(spec=['get_members']) + mock_coordinator.get_members.return_value = set() + mock_get_coordinator.return_value = mock_coordinator + self.assertRaises(utils.NoAvailableConductor, main.get_random_topic) diff --git a/ironic_inspector/test/unit/test_manager.py b/ironic_inspector/test/unit/test_manager.py index 8ddcf9cbe..d4264499d 100644 --- a/ironic_inspector/test/unit/test_manager.py +++ b/ironic_inspector/test/unit/test_manager.py @@ -17,7 +17,9 @@ import fixtures from ironic_lib import mdns import mock import oslo_messaging as messaging +import tooz +from ironic_inspector.common import coordination from ironic_inspector.common import keystone from ironic_inspector.common import swift from ironic_inspector.conductor import manager @@ -139,7 +141,7 @@ class TestManagerInitHost(BaseManagerTest): mock_zc.return_value.register_service.assert_called_once_with( 'baremetal-introspection', mock_endpoint.return_value) - @mock.patch.object(utils, 'get_coordinator', autospec=True) + @mock.patch.object(coordination, 'get_coordinator', autospec=True) @mock.patch.object(keystone, 'get_endpoint', autospec=True) def test_init_host_with_coordinator(self, mock_endpoint, mock_get_coord): CONF.set_override('standalone', False) @@ -150,24 +152,24 @@ class TestManagerInitHost(BaseManagerTest): self.mock_validate_processing_hooks.assert_called_once_with() self.mock_filter.init_filter.assert_called_once_with() self.assert_periodics() - mock_get_coord.assert_called_once_with() - mock_coordinator.start.assert_called_once_with() + mock_get_coord.assert_called_once_with(prefix='conductor') + mock_coordinator.start.assert_called_once_with(heartbeat=True) @mock.patch.object(manager.ConductorManager, 'del_host') - @mock.patch.object(utils, 'get_coordinator', autospec=True) + @mock.patch.object(coordination, 'get_coordinator', autospec=True) @mock.patch.object(keystone, 'get_endpoint', autospec=True) def test_init_host_with_coordinator_failed(self, mock_endpoint, mock_get_coord, mock_del_host): CONF.set_override('standalone', False) - mock_get_coord.side_effect = (utils.Error('Reaching coordination ' - 'backend failed.'), + mock_get_coord.side_effect = (tooz.ToozError('Reaching coordination ' + 'backend failed.'), None) - self.assertRaises(utils.Error, self.manager.init_host) + self.assertRaises(tooz.ToozError, self.manager.init_host) self.mock_db_init.assert_called_once_with() self.mock_validate_processing_hooks.assert_called_once_with() self.mock_filter.init_filter.assert_called_once_with() self.assert_periodics() - mock_get_coord.assert_called_once_with() + mock_get_coord.assert_called_once_with(prefix='conductor') mock_del_host.assert_called_once_with() @@ -282,11 +284,11 @@ class TestManagerDelHost(BaseManagerTest): self.mock_filter.tear_down_filter.assert_called_once_with() self.mock__shutting_down.release.assert_called_once_with() - @mock.patch.object(utils, 'get_coordinator', autospec=True) + @mock.patch.object(coordination, 'get_coordinator', autospec=True) def test_del_host_with_coordinator(self, mock_get_coord): CONF.set_override('standalone', False) - mock_coordinator = mock.MagicMock() - mock_coordinator.is_started = True + mock_coordinator = mock.Mock(spec=coordination.Coordinator) + mock_coordinator.started = True mock_get_coord.return_value = mock_coordinator self.manager.del_host() @@ -500,3 +502,20 @@ class TestManagerReapply(BaseManagerTest): store_mock.assert_called_once_with(self.uuid, self.data, processed=False) self.assertFalse(get_mock.called) + + +class TestManagerContinue(BaseManagerTest): + @mock.patch.object(process, 'process', autospec=True) + def test_continue_ok(self, process_mock): + self.manager.do_continue(self.context, self.data) + process_mock.assert_called_once_with(self.data) + + @mock.patch.object(process, 'process', autospec=True) + def test_continue_failed(self, process_mock): + process_mock.side_effect = utils.Error("Boom.") + exc = self.assertRaises(messaging.rpc.ExpectedException, + self.manager.do_continue, + self.context, self.data) + + self.assertEqual(utils.Error, exc.exc_info[0]) + process_mock.assert_called_once_with(self.data) diff --git a/ironic_inspector/test/unit/test_utils.py b/ironic_inspector/test/unit/test_utils.py index 49abac76f..17b335460 100644 --- a/ironic_inspector/test/unit/test_utils.py +++ b/ironic_inspector/test/unit/test_utils.py @@ -154,14 +154,3 @@ class TestIsoTimestamp(base.BaseTest): def test_none(self): self.assertIsNone(utils.iso_timestamp(None)) - - -@mock.patch.object(utils, 'coordination', autospec=True) -class TestGetCoordinator(base.BaseTest): - def test_get(self, mock_coordination): - CONF.set_override('backend_url', 'etcd3://1.2.3.4:2379', - 'coordination') - CONF.set_override('host', '1.2.3.5') - utils.get_coordinator() - mock_coordination.get_coordinator.assert_called_once_with( - 'etcd3://1.2.3.4:2379', b'1.2.3.5') diff --git a/ironic_inspector/test/unit/test_wsgi_service.py b/ironic_inspector/test/unit/test_wsgi_service.py index f3d70becb..cb95d24fc 100644 --- a/ironic_inspector/test/unit/test_wsgi_service.py +++ b/ironic_inspector/test/unit/test_wsgi_service.py @@ -15,7 +15,9 @@ import eventlet # noqa import fixtures from oslo_config import cfg +from ironic_inspector import main from ironic_inspector.test import base as test_base +from ironic_inspector import utils from ironic_inspector import wsgi_service CONF = cfg.CONF @@ -26,37 +28,32 @@ class BaseWSGITest(test_base.BaseTest): # generic mocks setUp method super(BaseWSGITest, self).setUp() self.app = self.useFixture(fixtures.MockPatchObject( - wsgi_service.app, 'app', autospec=True)).mock + main, '_app', autospec=True)).mock self.server = self.useFixture(fixtures.MockPatchObject( wsgi_service.wsgi, 'Server', autospec=True)).mock - self.mock_log = self.useFixture(fixtures.MockPatchObject( - wsgi_service, 'LOG')).mock - self.service = wsgi_service.WSGIService() - self.service.server = self.server class TestWSGIServiceInitMiddleware(BaseWSGITest): def setUp(self): super(TestWSGIServiceInitMiddleware, self).setUp() self.mock_add_auth_middleware = self.useFixture( - fixtures.MockPatchObject(wsgi_service.utils, - 'add_auth_middleware')).mock + fixtures.MockPatchObject(utils, 'add_auth_middleware')).mock self.mock_add_cors_middleware = self.useFixture( - fixtures.MockPatchObject(wsgi_service.utils, - 'add_cors_middleware')).mock + fixtures.MockPatchObject(utils, 'add_cors_middleware')).mock + self.mock_log = self.useFixture(fixtures.MockPatchObject( + main, 'LOG')).mock # 'positive' settings CONF.set_override('auth_strategy', 'keystone') CONF.set_override('store_data', 'swift', 'processing') def test_init_middleware(self): - self.service._init_middleware() - + wsgi_service.WSGIService() self.mock_add_auth_middleware.assert_called_once_with(self.app) self.mock_add_cors_middleware.assert_called_once_with(self.app) def test_init_middleware_noauth(self): CONF.set_override('auth_strategy', 'noauth') - self.service._init_middleware() + wsgi_service.WSGIService() self.mock_add_auth_middleware.assert_not_called() self.mock_log.warning.assert_called_once_with( @@ -67,8 +64,10 @@ class TestWSGIServiceInitMiddleware(BaseWSGITest): class TestWSGIService(BaseWSGITest): def setUp(self): super(TestWSGIService, self).setUp() + self.service = wsgi_service.WSGIService() + self.service.server = self.server self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject( - self.service, '_init_middleware')).mock + main, '_init_middleware')).mock # 'positive' settings CONF.set_override('listen_address', '42.42.42.42') @@ -76,8 +75,6 @@ class TestWSGIService(BaseWSGITest): def test_start(self): self.service.start() - - self.mock__init_middleware.assert_called_once_with() self.server.start.assert_called_once_with() def test_stop(self): diff --git a/ironic_inspector/utils.py b/ironic_inspector/utils.py index 3985ba044..f560d5133 100644 --- a/ironic_inspector/utils.py +++ b/ironic_inspector/utils.py @@ -21,7 +21,6 @@ from oslo_config import cfg from oslo_log import log from oslo_middleware import cors as cors_middleware import pytz -from tooz import coordination from ironic_inspector.common.i18n import _ from ironic_inspector import policy @@ -162,6 +161,13 @@ class IntrospectionDataNotFound(NotFoundInCacheError): """Introspection data not found.""" +class NoAvailableConductor(Error): + """No available conductor in the service group.""" + + def __init__(self, msg, **kwargs): + super(NoAvailableConductor, self).__init__(msg, code=503, **kwargs) + + def executor(): """Return the current futures executor.""" global _EXECUTOR @@ -248,14 +254,3 @@ def iso_timestamp(timestamp=None, tz=pytz.timezone('utc')): return None date = datetime.datetime.fromtimestamp(timestamp, tz=tz) return date.isoformat() - - -_COORDINATOR = None - - -def get_coordinator(): - global _COORDINATOR - if _COORDINATOR is None: - _COORDINATOR = coordination.get_coordinator( - CONF.coordination.backend_url, str.encode(CONF.host)) - return _COORDINATOR diff --git a/ironic_inspector/wsgi_service.py b/ironic_inspector/wsgi_service.py index b42e9c9a8..f8e55942d 100644 --- a/ironic_inspector/wsgi_service.py +++ b/ironic_inspector/wsgi_service.py @@ -15,8 +15,7 @@ from oslo_log import log from oslo_service import service from oslo_service import wsgi -from ironic_inspector import main as app -from ironic_inspector import utils +from ironic_inspector import main LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -26,31 +25,18 @@ class WSGIService(service.Service): """Provides ability to launch API from wsgi app.""" def __init__(self): - self.app = app.app + self.app = main.get_app() self.server = wsgi.Server(CONF, 'ironic_inspector', self.app, host=CONF.listen_address, port=CONF.listen_port, use_ssl=CONF.use_ssl) - def _init_middleware(self): - """Initialize WSGI middleware. - - :returns: None - """ - if CONF.auth_strategy != 'noauth': - utils.add_auth_middleware(self.app) - else: - LOG.warning('Starting unauthenticated, please check' - ' configuration') - utils.add_cors_middleware(self.app) - def start(self): """Start serving this service using loaded configuration. :returns: None """ - self._init_middleware() self.server.start() def stop(self): diff --git a/releasenotes/notes/split-services-99873ff27ef2d89b.yaml b/releasenotes/notes/split-services-99873ff27ef2d89b.yaml new file mode 100644 index 000000000..986da5e11 --- /dev/null +++ b/releasenotes/notes/split-services-99873ff27ef2d89b.yaml @@ -0,0 +1,14 @@ +--- +features: + - | + Allows splitting the ironic-inspector service to ironic-inspector-api and + ironic-inspector-conductor which coordinate via tooz and its underlying + backend. A new configuration option ``[DEFAULT]standalone`` is introduced + to enable this feature. The configuration defaults to True, + ironic-inspector runs as a single service, which is compatible with the + old behavior. When set to False, ``ironic-inspector-api-wsgi`` is used to + start the API service, and ``ironic-inspector-conductor`` is used to start + the conductor service. For ironic-inspector running at non-standalone + mode, user needs to set the new configuration option + ``[coordination]backend_url``, which specifies the backend used for + coordination. diff --git a/setup.cfg b/setup.cfg index d7a4abf8e..98d13e0fd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,8 +28,11 @@ packages = console_scripts = ironic-inspector = ironic_inspector.cmd.all:main ironic-inspector-dbsync = ironic_inspector.cmd.dbsync:main + ironic-inspector-conductor = ironic_inspector.cmd.conductor:main ironic-inspector-rootwrap = oslo_rootwrap.cmd:main ironic-inspector-migrate-data = ironic_inspector.cmd.migration:main +wsgi_scripts = + ironic-inspector-api-wsgi = ironic_inspector.cmd.wsgi:initialize_wsgi_app ironic_inspector.hooks.processing = scheduler = ironic_inspector.plugins.standard:SchedulerHook validate_interfaces = ironic_inspector.plugins.standard:ValidateInterfacesHook