Merge "Support using JSON-RPC instead of oslo.messaging"

This commit is contained in:
Zuul 2019-02-15 18:53:55 +00:00 committed by Gerrit Code Review
commit bc2bd6c22f
16 changed files with 1177 additions and 9 deletions

View File

@ -322,6 +322,10 @@ IRONIC_IPXE_USE_SWIFT=$(trueorfalse False IRONIC_IPXE_USE_SWIFT)
IRONIC_HTTP_DIR=${IRONIC_HTTP_DIR:-$IRONIC_DATA_DIR/httpboot}
IRONIC_HTTP_PORT=${IRONIC_HTTP_PORT:-3928}
# Allow using JSON RPC instead of oslo.messaging
IRONIC_RPC_TRANSPORT=${IRONIC_RPC_TRANSPORT:-oslo}
IRONIC_JSON_RPC_PORT=${IRONIC_JSON_RPC_PORT:-8089}
# Whether DevStack will be setup for bare metal or VMs
IRONIC_IS_HARDWARE=$(trueorfalse False IRONIC_IS_HARDWARE)
@ -1107,6 +1111,9 @@ function configure_ironic {
iniset $IRONIC_CONF_FILE agent deploy_logs_local_path $IRONIC_DEPLOY_LOGS_LOCAL_PATH
# Set image_download_source for direct interface
iniset $IRONIC_CONF_FILE agent image_download_source $IRONIC_AGENT_IMAGE_DOWNLOAD_SOURCE
# Configure JSON RPC backend
iniset $IRONIC_CONF_FILE DEFAULT rpc_transport $IRONIC_RPC_TRANSPORT
iniset $IRONIC_CONF_FILE json_rpc port $IRONIC_JSON_RPC_PORT
# Configure Ironic conductor, if it was enabled.
if is_service_enabled ir-cond; then
@ -1171,7 +1178,9 @@ function configure_ironic_api {
iniset $IRONIC_CONF_FILE DEFAULT auth_strategy $IRONIC_AUTH_STRATEGY
configure_auth_token_middleware $IRONIC_CONF_FILE ironic $IRONIC_AUTH_CACHE_DIR/api
iniset_rpc_backend ironic $IRONIC_CONF_FILE
if is_service_enabled rabbit; then
iniset_rpc_backend ironic $IRONIC_CONF_FILE
fi
iniset $IRONIC_CONF_FILE conductor automated_clean $IRONIC_AUTOMATED_CLEAN_ENABLED
@ -1212,7 +1221,7 @@ function configure_ironic_conductor {
# NOTE(pas-ha) service_catalog section is used to discover
# ironic API endpoint from keystone catalog
local client_sections="neutron swift glance inspector cinder service_catalog"
local client_sections="neutron swift glance inspector cinder service_catalog json_rpc"
for conf_section in $client_sections; do
configure_client_for $conf_section
done

View File

@ -29,6 +29,47 @@ Configuring ironic-api service
# configuration. (string value)
transport_url = rabbit://RPC_USER:RPC_PASSWORD@RPC_HOST:RPC_PORT/
Alternatively, you can use JSON RPC for interactions between
ironic-conductor and ironic-api. Enable it in the configuration and provide
the keystone credentials to use for authentication:
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
[json_rpc]
# Authentication type to load (string value)
auth_type = password
# Authentication URL (string value)
auth_url=https://IDENTITY_IP:5000/
# Username (string value)
username=ironic
# User's password (string value)
password=IRONIC_PASSWORD
# Project name to scope to (string value)
project_name=service
# Domain ID containing project (string value)
project_domain_id=default
# User's domain id (string value)
user_domain_id=default
If you use port other than the default 8089 for JSON RPC, you have to
configure it, for example:
.. code-block:: ini
[json_rpc]
port = 9999
#. Configure the ironic-api service to use these credentials with the Identity
service. Replace ``PUBLIC_IDENTITY_IP`` with the public IP of the Identity
server, ``PRIVATE_IDENTITY_IP`` with the private IP of the Identity server

View File

@ -42,6 +42,56 @@ Configuring ironic-conductor service
# configuration. (string value)
transport_url = rabbit://RPC_USER:RPC_PASSWORD@RPC_HOST:RPC_PORT/
Alternatively, you can use JSON RPC for interactions between
ironic-conductor and ironic-api. Enable it in the configuration and provide
the keystone credentials to use for authenticating incoming requests (can
be the same as for the API):
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
[keystone_authtoken]
# Authentication type to load (string value)
auth_type=password
# Complete public Identity API endpoint (string value)
www_authenticate_uri=http://PUBLIC_IDENTITY_IP:5000
# Complete admin Identity API endpoint. (string value)
auth_url=http://PRIVATE_IDENTITY_IP:5000
# Service username. (string value)
username=ironic
# Service account password. (string value)
password=IRONIC_PASSWORD
# Service tenant name. (string value)
project_name=service
# Domain name containing project (string value)
project_domain_name=Default
# User's domain name (string value)
user_domain_name=Default
You can optionally change the host and the port the JSON RPC service will
bind to, for example:
.. code-block:: ini
[json_rpc]
host_ip = 192.168.0.10
port = 9999
.. warning::
Hostnames of ironic-conductor machines must be resolvable by ironic-api
services when JSON RPC is used.
#. Configure credentials for accessing other OpenStack services.
In order to communicate with other OpenStack services, the Bare Metal

View File

@ -30,6 +30,14 @@ You should make the following changes to ``/etc/ironic/ironic.conf``:
Networking since it will do all the dynamically changing configurations
for you.
#. If you want to disable using a messaging broker between conductor and API
processes, switch to JSON RPC instead:
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
If you don't use Image service, it's possible to provide images to Bare Metal
service via a URL.

View File

@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
def require_authentication():
return (CONF.json_rpc.auth_strategy or CONF.auth_strategy) == 'keystone'

View File

@ -0,0 +1,185 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""A simple JSON RPC client.
This client is compatible with any JSON RPC 2.0 implementation, including ours.
"""
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import uuidutils
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import json_rpc
from ironic.common import keystone
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_SESSION = None
def _get_session():
global _SESSION
if _SESSION is None:
if json_rpc.require_authentication():
auth = keystone.get_auth('json_rpc')
else:
auth = None
_SESSION = keystone.get_session('json_rpc', auth=auth)
_SESSION.headers = {
'Content-Type': 'application/json'
}
return _SESSION
class Client(object):
"""JSON RPC client with ironic exception handling."""
def __init__(self, serializer, version_cap=None):
self.serializer = serializer
self.version_cap = version_cap
def can_send_version(self, version):
return _can_send_version(version, self.version_cap)
def prepare(self, topic, version=None):
host = topic.split('.', 1)[1]
return _CallContext(host, self.serializer, version=version,
version_cap=self.version_cap)
class _CallContext(object):
"""Wrapper object for compatibility with oslo.messaging API."""
def __init__(self, host, serializer, version=None, version_cap=None):
self.host = host
self.serializer = serializer
self.version = version
self.version_cap = version_cap
def _handle_error(self, error):
if not error:
return
message = error['message']
try:
cls = error['data']['class']
except KeyError:
LOG.error("Unexpected error from RPC: %s", error)
raise exception.IronicException(
_("Unexpected error raised by RPC"))
else:
if not cls.startswith('ironic.common.exception.'):
# NOTE(dtantsur): protect against arbitrary code execution
LOG.error("Unexpected error from RPC: %s", error)
raise exception.IronicException(
_("Unexpected error raised by RPC"))
raise importutils.import_object(cls, message,
code=error.get('code', 500))
def call(self, context, method, version=None, **kwargs):
"""Call conductor RPC.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: RPC result (if any).
"""
return self._request(context, method, cast=False, version=version,
**kwargs)
def cast(self, context, method, version=None, **kwargs):
"""Call conductor RPC asynchronously.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: None
"""
return self._request(context, method, cast=True, version=version,
**kwargs)
def _request(self, context, method, cast=False, version=None, **kwargs):
"""Call conductor RPC.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param cast: If true, use a JSON RPC notification.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: RPC result (if any).
"""
params = {key: self.serializer.serialize_entity(context, value)
for key, value in kwargs.items()}
params['context'] = context.to_dict()
if version is None:
version = self.version
if version is not None:
_check_version(version, self.version_cap)
params['rpc.version'] = version
body = {
"jsonrpc": "2.0",
"method": method,
"params": params,
}
if not cast:
body['id'] = context.request_id or uuidutils.generate_uuid()
LOG.debug("RPC %s with %s", method, body)
url = 'http://%s:%d' % (self.host, CONF.json_rpc.port)
result = _get_session().post(url, json=body)
LOG.debug('RPC %s returned %s', method, result.text or '<None>')
if not cast:
result = result.json()
self._handle_error(result.get('error'))
result = self.serializer.deserialize_entity(context,
result['result'])
return result
def _can_send_version(requested, version_cap):
if requested is None or version_cap is None:
return True
requested_parts = [int(item) for item in requested.split('.', 1)]
version_cap_parts = [int(item) for item in version_cap.split('.', 1)]
if requested_parts[0] != version_cap_parts[0]:
return False # major version mismatch
else:
return requested_parts[1] <= version_cap_parts[1]
def _check_version(requested, version_cap):
if not _can_send_version(requested, version_cap):
raise RuntimeError(_("Cannot send RPC request: requested version "
"%(requested)s, maximum allowed version is "
"%(version_cap)s") % {'requested': requested,
'version_cap': version_cap})

View File

@ -0,0 +1,283 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of JSON RPC for communication between API and conductors.
This module implementa a subset of JSON RPC 2.0 as defined in
https://www.jsonrpc.org/specification. Main differences:
* No support for batched requests.
* No support for positional arguments passing.
* No JSON RPC 1.0 fallback.
"""
import json
from keystonemiddleware import auth_token
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_service import service
from oslo_service import wsgi
import webob
from ironic.common import context as ir_context
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import json_rpc
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_BLACK_LIST = {'init_host', 'del_host', 'target', 'iter_nodes'}
def _build_method_map(manager):
"""Build mapping from method names to their bodies.
:param manager: A conductor manager.
:return: dict with mapping
"""
result = {}
for method in dir(manager):
if method.startswith('_') or method in _BLACK_LIST:
continue
func = getattr(manager, method)
if not callable(func):
continue
LOG.debug('Adding RPC method %s', method)
result[method] = func
return result
class JsonRpcError(exception.IronicException):
pass
class ParseError(JsonRpcError):
code = -32700
_msg_fmt = _("Invalid JSON received by RPC server")
class InvalidRequest(JsonRpcError):
code = -32600
_msg_fmt = _("Invalid request object received by RPC server")
class MethodNotFound(JsonRpcError):
code = -32601
_msg_fmt = _("Method %(name)s was not found")
class InvalidParams(JsonRpcError):
code = -32602
_msg_fmt = _("Params %(params)s are invalid for %(method)s: %(error)s")
class WSGIService(service.Service):
"""Provides ability to launch JSON RPC as a WSGI application."""
def __init__(self, manager, serializer):
self.manager = manager
self.serializer = serializer
self._method_map = _build_method_map(manager)
if json_rpc.require_authentication():
conf = dict(CONF.keystone_authtoken)
app = auth_token.AuthProtocol(self._application, conf)
else:
app = self._application
self.server = wsgi.Server(CONF, 'ironic-json-rpc', app,
host=CONF.json_rpc.host_ip,
port=CONF.json_rpc.port,
use_ssl=CONF.json_rpc.use_ssl)
def _application(self, environment, start_response):
"""WSGI application for conductor JSON RPC."""
request = webob.Request(environment)
if request.method != 'POST':
body = {'error': {'code': 405,
'message': _('Only POST method can be used')}}
return webob.Response(status_code=405, json_body=body)(
environment, start_response)
if json_rpc.require_authentication():
roles = (request.headers.get('X-Roles') or '').split(',')
if 'admin' not in roles:
LOG.debug('Roles %s do not contain "admin", rejecting '
'request', roles)
body = {'error': {'code': 403, 'message': _('Forbidden')}}
return webob.Response(status_code=403, json_body=body)(
environment, start_response)
result = self._call(request)
if result is not None:
response = webob.Response(content_type='application/json',
charset='UTF-8',
json_body=result)
else:
response = webob.Response(status_code=204)
return response(environment, start_response)
def _handle_error(self, exc, request_id=None):
"""Generate a JSON RPC 2.0 error body.
:param exc: Exception object.
:param request_id: ID of the request (if any).
:return: dict with response body
"""
if isinstance(exc, oslo_messaging.ExpectedException):
exc = exc.exc_info[1]
expected = isinstance(exc, exception.IronicException)
cls = exc.__class__
if expected:
LOG.debug('RPC error %s: %s', cls.__name__, exc)
else:
LOG.exception('Unexpected RPC exception %s', cls.__name__)
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": getattr(exc, 'code', 500),
"message": str(exc),
}
}
if expected and not isinstance(exc, JsonRpcError):
# Allow de-serializing the correct class for expected errors.
response['error']['data'] = {
'class': '%s.%s' % (cls.__module__, cls.__name__)
}
return response
def _call(self, request):
"""Process a JSON RPC request.
:param request: ``webob.Request`` object.
:return: dict with response body.
"""
request_id = None
try:
try:
body = json.loads(request.text)
except ValueError:
LOG.error('Cannot parse JSON RPC request as JSON')
raise ParseError()
if not isinstance(body, dict):
LOG.error('JSON RPC request %s is not an object (batched '
'requests are not supported)', body)
raise InvalidRequest()
request_id = body.get('id')
params = body.get('params', {})
if (body.get('jsonrpc') != '2.0'
or not body.get('method')
or not isinstance(params, dict)):
LOG.error('JSON RPC request %s is invalid', body)
raise InvalidRequest()
except Exception as exc:
# We do not treat malformed requests as notifications and return
# a response even when request_id is None. This seems in agreement
# with the examples in the specification.
return self._handle_error(exc, request_id)
try:
method = body['method']
try:
func = self._method_map[method]
except KeyError:
raise MethodNotFound(name=method)
result = self._handle_requests(func, method, params)
if request_id is not None:
return {
"jsonrpc": "2.0",
"result": result,
"id": request_id
}
except Exception as exc:
result = self._handle_error(exc, request_id)
# We treat correctly formed requests without "id" as notifications
# and do not return any errors.
if request_id is not None:
return result
def _handle_requests(self, func, name, params):
"""Convert arguments and call a method.
:param func: Callable object.
:param name: RPC call name for logging.
:param params: Keyword arguments.
:return: call result as JSON.
"""
# TODO(dtantsur): server-side version check?
params.pop('rpc.version', None)
try:
context = params.pop('context')
except KeyError:
context = None
else:
# A valid context is required for deserialization
if not isinstance(context, dict):
raise InvalidParams(
_("Context must be a dictionary, if provided"))
context = ir_context.RequestContext.from_dict(context)
params = {key: self.serializer.deserialize_entity(context, value)
for key, value in params.items()}
params['context'] = context
LOG.debug('RPC %s with %s', name, params)
try:
result = func(**params)
# FIXME(dtantsur): we could use the inspect module, but
# oslo_messaging.expected_exceptions messes up signatures.
except TypeError as exc:
raise InvalidParams(params=', '.join(params),
method=name, error=exc)
if context is not None:
# Currently it seems that we can serialize even with invalid
# context, but I'm not sure it's guaranteed to be the case.
result = self.serializer.serialize_entity(context, result)
LOG.debug('RPC %s returned %s', name, result)
return result
def start(self):
"""Start serving this service using loaded configuration.
:returns: None
"""
self.server.start()
def stop(self):
"""Stop serving this API.
:returns: None
"""
self.server.stop()
def wait(self):
"""Wait for the service to stop serving this API.
:returns: None
"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default.
:returns: None
"""
self.server.reset()

View File

@ -16,16 +16,19 @@
import signal
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from oslo_service import service
from oslo_utils import importutils
from ironic.common import context
from ironic.common.json_rpc import server as json_rpc
from ironic.common import rpc
from ironic.objects import base as objects_base
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class RPCService(service.Service):
@ -44,10 +47,14 @@ class RPCService(service.Service):
super(RPCService, self).start()
admin_context = context.get_admin_context()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.IronicObjectSerializer(is_server=True)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
if CONF.rpc_transport == 'json-rpc':
self.rpcserver = json_rpc.WSGIService(self.manager,
serializer)
else:
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.handle_signal()

View File

@ -25,6 +25,7 @@ import oslo_messaging as messaging
from ironic.common import exception
from ironic.common import hash_ring
from ironic.common.i18n import _
from ironic.common.json_rpc import client as json_rpc
from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.conductor import manager
@ -112,14 +113,19 @@ class ConductorAPI(object):
if self.topic is None:
self.topic = manager.MANAGER_TOPIC
target = messaging.Target(topic=self.topic,
version='1.0')
serializer = objects_base.IronicObjectSerializer()
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
version_cap = (release_ver['rpc'] if release_ver
else self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
if CONF.rpc_transport == 'json-rpc':
self.client = json_rpc.Client(serializer=serializer,
version_cap=version_cap)
self.topic = ''
else:
target = messaging.Target(topic=self.topic, version='1.0')
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
use_groups = self.client.can_send_version('1.47')
# NOTE(deva): this is going to be buggy

View File

@ -35,6 +35,7 @@ from ironic.conf import inspector
from ironic.conf import ipmi
from ironic.conf import irmc
from ironic.conf import iscsi
from ironic.conf import json_rpc
from ironic.conf import metrics
from ironic.conf import metrics_statsd
from ironic.conf import neutron
@ -67,6 +68,7 @@ inspector.register_opts(CONF)
ipmi.register_opts(CONF)
irmc.register_opts(CONF)
iscsi.register_opts(CONF)
json_rpc.register_opts(CONF)
metrics.register_opts(CONF)
metrics_statsd.register_opts(CONF)
neutron.register_opts(CONF)

View File

@ -330,6 +330,12 @@ service_opts = [
'When doing a rolling upgrade from version N to version '
'N+1, set (to pin) this to N. To unpin (default), leave '
'it unset and the latest versions will be used.')),
cfg.StrOpt('rpc_transport',
default='oslo',
choices=[('oslo', _('use oslo.messaging transport')),
('json-rpc', _('use JSON RPC transport'))],
help=_('Which RPC transport implementation to use between '
'conductor and API services')),
]
utils_opts = [

44
ironic/conf/json_rpc.py Normal file
View File

@ -0,0 +1,44 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from ironic.common.i18n import _
from ironic.conf import auth
opts = [
cfg.StrOpt('auth_strategy',
choices=[('noauth', _('no authentication')),
('keystone', _('use the Identity service for '
'authentication'))],
help=_('Authentication strategy used by JSON RPC. Defaults to '
'the global auth_strategy setting.')),
cfg.HostAddressOpt('host_ip',
default='0.0.0.0',
help=_('The IP address or hostname on which JSON RPC '
'will listen.')),
cfg.PortOpt('port',
default=8089,
help=_('The port to use for JSON RPC')),
cfg.BoolOpt('use_ssl',
default=False,
help=_('Whether to use TLS for JSON RPC')),
]
def register_opts(conf):
conf.register_opts(opts, group='json_rpc')
auth.register_auth_opts(conf, 'json_rpc')
def list_opts():
return opts + auth.add_auth_opts([])

View File

@ -53,6 +53,7 @@ _opts = [
('ipmi', ironic.conf.ipmi.opts),
('irmc', ironic.conf.irmc.opts),
('iscsi', ironic.conf.iscsi.opts),
('json_rpc', ironic.conf.json_rpc.list_opts()),
('metrics', ironic.conf.metrics.opts),
('metrics_statsd', ironic.conf.metrics_statsd.opts),
('neutron', ironic.conf.neutron.list_opts()),

View File

@ -0,0 +1,495 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
import oslo_messaging
import webob
from ironic.common import context as ir_ctx
from ironic.common import exception
from ironic.common.json_rpc import client
from ironic.common.json_rpc import server
from ironic import objects
from ironic.objects import base as objects_base
from ironic.tests import base as test_base
from ironic.tests.unit.objects import utils as obj_utils
class FakeManager(object):
def success(self, context, x, y=0):
assert isinstance(context, ir_ctx.RequestContext)
assert context.user_name == 'admin'
return x - y
def with_node(self, context, node):
assert isinstance(context, ir_ctx.RequestContext)
assert isinstance(node, objects.Node)
node.extra['answer'] = 42
return node
def no_result(self, context):
assert isinstance(context, ir_ctx.RequestContext)
return None
def no_context(self):
return 42
def fail(self, context, message):
assert isinstance(context, ir_ctx.RequestContext)
raise exception.IronicException(message)
@oslo_messaging.expected_exceptions(exception.Invalid)
def expected(self, context, message):
assert isinstance(context, ir_ctx.RequestContext)
raise exception.Invalid(message)
def crash(self, context):
raise RuntimeError('boom')
def init_host(self, context):
assert False, "This should not be exposed"
def _private(self, context):
assert False, "This should not be exposed"
# This should not be exposed either
value = 42
class TestService(test_base.TestCase):
def setUp(self):
super(TestService, self).setUp()
self.config(auth_strategy='noauth', group='json_rpc')
self.server_mock = self.useFixture(fixtures.MockPatch(
'oslo_service.wsgi.Server', autospec=True)).mock
self.serializer = objects_base.IronicObjectSerializer(is_server=True)
self.service = server.WSGIService(FakeManager(), self.serializer)
self.app = self.service._application
self.ctx = {'user_name': 'admin'}
def _request(self, name=None, params=None, expected_error=None,
request_id='abcd', **kwargs):
body = {
'jsonrpc': '2.0',
}
if request_id is not None:
body['id'] = request_id
if name is not None:
body['method'] = name
if params is not None:
body['params'] = params
if 'json_body' not in kwargs:
kwargs['json_body'] = body
kwargs.setdefault('method', 'POST')
kwargs.setdefault('headers', {'Content-Type': 'application/json'})
request = webob.Request.blank("/", **kwargs)
response = request.get_response(self.app)
self.assertEqual(response.status_code,
expected_error or (200 if request_id else 204))
if request_id is not None:
if expected_error:
self.assertEqual(expected_error,
response.json_body['error']['code'])
else:
return response.json_body
else:
self.assertFalse(response.text)
def _check(self, body, result=None, error=None, request_id='abcd'):
self.assertEqual('2.0', body.pop('jsonrpc'))
self.assertEqual(request_id, body.pop('id'))
if error is not None:
self.assertEqual({'error': error}, body)
else:
self.assertEqual({'result': result}, body)
def test_success(self):
body = self._request('success', {'context': self.ctx, 'x': 42})
self._check(body, result=42)
def test_success_no_result(self):
body = self._request('no_result', {'context': self.ctx})
self._check(body, result=None)
def test_notification(self):
body = self._request('no_result', {'context': self.ctx},
request_id=None)
self.assertIsNone(body)
def test_no_context(self):
body = self._request('no_context')
self._check(body, result=42)
def test_serialize_objects(self):
node = obj_utils.get_test_node(self.context)
node = self.serializer.serialize_entity(self.context, node)
body = self._request('with_node', {'context': self.ctx, 'node': node})
self.assertNotIn('error', body)
self.assertIsInstance(body['result'], dict)
node = self.serializer.deserialize_entity(self.context, body['result'])
self.assertEqual({'answer': 42}, node.extra)
def test_non_json_body(self):
for body in (b'', b'???', b"\xc3\x28"):
request = webob.Request.blank("/", method='POST', body=body)
response = request.get_response(self.app)
self._check(
response.json_body,
error={
'message': server.ParseError._msg_fmt,
'code': -32700,
},
request_id=None)
def test_invalid_requests(self):
bodies = [
# Invalid requests with request ID.
{'method': 'no_result', 'id': 'abcd',
'params': {'context': self.ctx}},
{'jsonrpc': '2.0', 'id': 'abcd', 'params': {'context': self.ctx}},
# These do not count as notifications, since they're malformed.
{'method': 'no_result', 'params': {'context': self.ctx}},
{'jsonrpc': '2.0', 'params': {'context': self.ctx}},
42,
# We do not implement batched requests.
[],
[{'jsonrpc': '2.0', 'method': 'no_result',
'params': {'context': self.ctx}}],
]
for body in bodies:
body = self._request(json_body=body)
self._check(
body,
error={
'message': server.InvalidRequest._msg_fmt,
'code': -32600,
},
request_id=body.get('id'))
def test_malformed_context(self):
body = self._request(json_body={'jsonrpc': '2.0', 'id': 'abcd',
'method': 'no_result',
'params': {'context': 42}})
self._check(
body,
error={
'message': 'Context must be a dictionary, if provided',
'code': -32602,
})
def test_expected_failure(self):
body = self._request('fail', {'context': self.ctx,
'message': 'some error'})
self._check(body,
error={
'message': 'some error',
'code': 500,
'data': {
'class': 'ironic.common.exception.IronicException'
}
})
def test_expected_failure_oslo(self):
# Check that exceptions wrapped by oslo's expected_exceptions get
# unwrapped correctly.
body = self._request('expected', {'context': self.ctx,
'message': 'some error'})
self._check(body,
error={
'message': 'some error',
'code': 400,
'data': {
'class': 'ironic.common.exception.Invalid'
}
})
@mock.patch.object(server.LOG, 'exception', autospec=True)
def test_unexpected_failure(self, mock_log):
body = self._request('crash', {'context': self.ctx})
self._check(body,
error={
'message': 'boom',
'code': 500,
})
self.assertTrue(mock_log.called)
def test_method_not_found(self):
body = self._request('banana', {'context': self.ctx})
self._check(body,
error={
'message': 'Method banana was not found',
'code': -32601,
})
def test_no_blacklisted_methods(self):
for name in ('__init__', '_private', 'init_host', 'value'):
body = self._request(name, {'context': self.ctx})
self._check(body,
error={
'message': 'Method %s was not found' % name,
'code': -32601,
})
def test_missing_argument(self):
body = self._request('success', {'context': self.ctx})
# The exact error message depends on the Python version
self.assertEqual(-32602, body['error']['code'])
self.assertNotIn('result', body)
def test_method_not_post(self):
self._request('success', {'context': self.ctx, 'x': 42},
method='GET', expected_error=405)
def test_authenticated(self):
self.config(auth_strategy='keystone', group='json_rpc')
self.service = server.WSGIService(FakeManager(), self.serializer)
self.app = self.server_mock.call_args[0][2]
self._request('success', {'context': self.ctx, 'x': 42},
expected_error=401)
def test_authenticated_no_admin_role(self):
self.config(auth_strategy='keystone', group='json_rpc')
self._request('success', {'context': self.ctx, 'x': 42},
expected_error=403)
@mock.patch.object(client, '_get_session', autospec=True)
class TestClient(test_base.TestCase):
def setUp(self):
super(TestClient, self).setUp()
self.serializer = objects_base.IronicObjectSerializer(is_server=True)
self.client = client.Client(self.serializer)
self.ctx_json = self.context.to_dict()
def test_can_send_version(self, mock_session):
self.assertTrue(self.client.can_send_version('1.42'))
self.client = client.Client(self.serializer, version_cap='1.42')
self.assertTrue(self.client.can_send_version('1.42'))
self.assertTrue(self.client.can_send_version('1.0'))
self.assertFalse(self.client.can_send_version('1.99'))
self.assertFalse(self.client.can_send_version('2.0'))
def test_call_success(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_success_with_version(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'},
'id': self.context.request_id})
def test_call_success_with_version_and_cap(self, mock_session):
self.client = client.Client(self.serializer, version_cap='1.99')
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'},
'id': self.context.request_id})
def test_cast_success(self, mock_session):
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json}})
def test_cast_success_with_version(self, mock_session):
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'}})
def test_call_serialization(self, mock_session):
node = obj_utils.get_test_node(self.context)
node_json = self.serializer.serialize_entity(self.context, node)
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': node_json
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', node=node)
self.assertIsInstance(result, objects.Node)
self.assertEqual(result.uuid, node.uuid)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'node': node_json, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_failure(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 418,
'message': 'I am a teapot',
'data': {
'class': 'ironic.common.exception.Invalid'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
# Make sure that the class is restored correctly for expected errors.
exc = self.assertRaises(exception.Invalid,
cctx.call,
self.context, 'do_something', answer=42)
# Code from the body has priority over one in the class.
self.assertEqual(418, exc.code)
self.assertIn('I am a teapot', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_unexpected_failure(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 500,
'message': 'AttributeError',
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
exc = self.assertRaises(exception.IronicException,
cctx.call,
self.context, 'do_something', answer=42)
self.assertEqual(500, exc.code)
self.assertIn('Unexpected error', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_failure_with_foreign_class(self, mock_session):
# This should not happen, but provide an additional safeguard
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 500,
'message': 'AttributeError',
'data': {
'class': 'AttributeError'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
exc = self.assertRaises(exception.IronicException,
cctx.call,
self.context, 'do_something', answer=42)
self.assertEqual(500, exc.code)
self.assertIn('Unexpected error', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_cast_failure(self, mock_session):
# Cast cannot return normal failures, but make sure we ignore them even
# if server sends something in violation of the protocol (or because
# it's a low-level error like HTTP Forbidden).
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 418,
'message': 'I am a teapot',
'data': {
'class': 'ironic.common.exception.IronicException'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json}})
def test_call_failure_with_version_and_cap(self, mock_session):
self.client = client.Client(self.serializer, version_cap='1.42')
cctx = self.client.prepare('foo.example.com', version='1.99')
self.assertRaisesRegex(RuntimeError,
"requested version 1.99, maximum allowed "
"version is 1.42",
cctx.call, self.context, 'do_something',
answer=42)
self.assertFalse(mock_session.return_value.post.called)

View File

@ -0,0 +1,8 @@
---
features:
- |
Adds the ability to use JSON RPC for communication between API and
conductor services. To use it set the new ``rpc_transport`` configuration
options to ``json-rpc`` and configure the credentials and the ``host_ip``
in the ``json_rpc`` section. Hostnames of all conductors must be
resolvable for this implementation to work.

View File

@ -92,6 +92,7 @@
IRONIC_AUTOMATED_CLEAN_ENABLED: False
IRONIC_DEFAULT_DEPLOY_INTERFACE: direct
IRONIC_ENABLED_DEPLOY_INTERFACES: "iscsi,direct,ansible"
IRONIC_RPC_TRANSPORT: json-rpc
IRONIC_VM_COUNT: 6
SWIFT_ENABLE_TEMPURLS: True
SWIFT_TEMPURL_KEY: secretkey
@ -269,7 +270,9 @@
IRONIC_DEFAULT_NETWORK_INTERFACE: noop
IRONIC_TEMPEST_WHOLE_DISK_IMAGE: True
IRONIC_VM_EPHEMERAL_DISK: 0
IRONIC_RPC_TRANSPORT: json-rpc
devstack_services:
rabbit: False
g-api: False
g-reg: False
n-api: False