Support using JSON-RPC instead of oslo.messaging

Using AMQP-based RPC can be an overkill in case of ironic, especially
when used standalone. This change allows using a built-in JSON RPC
implementation instead.

This implementation does not currently support the object indirection
API, which does not seem to be used anywhere anyway.

The standalone and API test jobs are changed to use JSON RPC.

Story: #2004874
Task: #29141
Change-Id: I7cc95935d6bdae43fab5dfbd544c8e6d65fcc38d
This commit is contained in:
Dmitry Tantsur 2019-01-24 13:22:34 +01:00
parent e34941c327
commit 7efbbcc2d9
16 changed files with 1177 additions and 9 deletions

View File

@ -322,6 +322,10 @@ IRONIC_IPXE_USE_SWIFT=$(trueorfalse False IRONIC_IPXE_USE_SWIFT)
IRONIC_HTTP_DIR=${IRONIC_HTTP_DIR:-$IRONIC_DATA_DIR/httpboot}
IRONIC_HTTP_PORT=${IRONIC_HTTP_PORT:-3928}
# Allow using JSON RPC instead of oslo.messaging
IRONIC_RPC_TRANSPORT=${IRONIC_RPC_TRANSPORT:-oslo}
IRONIC_JSON_RPC_PORT=${IRONIC_JSON_RPC_PORT:-8089}
# Whether DevStack will be setup for bare metal or VMs
IRONIC_IS_HARDWARE=$(trueorfalse False IRONIC_IS_HARDWARE)
@ -1107,6 +1111,9 @@ function configure_ironic {
iniset $IRONIC_CONF_FILE agent deploy_logs_local_path $IRONIC_DEPLOY_LOGS_LOCAL_PATH
# Set image_download_source for direct interface
iniset $IRONIC_CONF_FILE agent image_download_source $IRONIC_AGENT_IMAGE_DOWNLOAD_SOURCE
# Configure JSON RPC backend
iniset $IRONIC_CONF_FILE DEFAULT rpc_transport $IRONIC_RPC_TRANSPORT
iniset $IRONIC_CONF_FILE json_rpc port $IRONIC_JSON_RPC_PORT
# Configure Ironic conductor, if it was enabled.
if is_service_enabled ir-cond; then
@ -1171,7 +1178,9 @@ function configure_ironic_api {
iniset $IRONIC_CONF_FILE DEFAULT auth_strategy $IRONIC_AUTH_STRATEGY
configure_auth_token_middleware $IRONIC_CONF_FILE ironic $IRONIC_AUTH_CACHE_DIR/api
iniset_rpc_backend ironic $IRONIC_CONF_FILE
if is_service_enabled rabbit; then
iniset_rpc_backend ironic $IRONIC_CONF_FILE
fi
iniset $IRONIC_CONF_FILE conductor automated_clean $IRONIC_AUTOMATED_CLEAN_ENABLED
@ -1212,7 +1221,7 @@ function configure_ironic_conductor {
# NOTE(pas-ha) service_catalog section is used to discover
# ironic API endpoint from keystone catalog
local client_sections="neutron swift glance inspector cinder service_catalog"
local client_sections="neutron swift glance inspector cinder service_catalog json_rpc"
for conf_section in $client_sections; do
configure_client_for $conf_section
done

View File

@ -29,6 +29,47 @@ Configuring ironic-api service
# configuration. (string value)
transport_url = rabbit://RPC_USER:RPC_PASSWORD@RPC_HOST:RPC_PORT/
Alternatively, you can use JSON RPC for interactions between
ironic-conductor and ironic-api. Enable it in the configuration and provide
the keystone credentials to use for authentication:
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
[json_rpc]
# Authentication type to load (string value)
auth_type = password
# Authentication URL (string value)
auth_url=https://IDENTITY_IP:5000/
# Username (string value)
username=ironic
# User's password (string value)
password=IRONIC_PASSWORD
# Project name to scope to (string value)
project_name=service
# Domain ID containing project (string value)
project_domain_id=default
# User's domain id (string value)
user_domain_id=default
If you use port other than the default 8089 for JSON RPC, you have to
configure it, for example:
.. code-block:: ini
[json_rpc]
port = 9999
#. Configure the ironic-api service to use these credentials with the Identity
service. Replace ``PUBLIC_IDENTITY_IP`` with the public IP of the Identity
server, ``PRIVATE_IDENTITY_IP`` with the private IP of the Identity server

View File

@ -42,6 +42,56 @@ Configuring ironic-conductor service
# configuration. (string value)
transport_url = rabbit://RPC_USER:RPC_PASSWORD@RPC_HOST:RPC_PORT/
Alternatively, you can use JSON RPC for interactions between
ironic-conductor and ironic-api. Enable it in the configuration and provide
the keystone credentials to use for authenticating incoming requests (can
be the same as for the API):
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
[keystone_authtoken]
# Authentication type to load (string value)
auth_type=password
# Complete public Identity API endpoint (string value)
www_authenticate_uri=http://PUBLIC_IDENTITY_IP:5000
# Complete admin Identity API endpoint. (string value)
auth_url=http://PRIVATE_IDENTITY_IP:5000
# Service username. (string value)
username=ironic
# Service account password. (string value)
password=IRONIC_PASSWORD
# Service tenant name. (string value)
project_name=service
# Domain name containing project (string value)
project_domain_name=Default
# User's domain name (string value)
user_domain_name=Default
You can optionally change the host and the port the JSON RPC service will
bind to, for example:
.. code-block:: ini
[json_rpc]
host_ip = 192.168.0.10
port = 9999
.. warning::
Hostnames of ironic-conductor machines must be resolvable by ironic-api
services when JSON RPC is used.
#. Configure credentials for accessing other OpenStack services.
In order to communicate with other OpenStack services, the Bare Metal

View File

@ -30,6 +30,14 @@ You should make the following changes to ``/etc/ironic/ironic.conf``:
Networking since it will do all the dynamically changing configurations
for you.
#. If you want to disable using a messaging broker between conductor and API
processes, switch to JSON RPC instead:
.. code-block:: ini
[DEFAULT]
rpc_transport = json-rpc
If you don't use Image service, it's possible to provide images to Bare Metal
service via a URL.

View File

@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
def require_authentication():
return (CONF.json_rpc.auth_strategy or CONF.auth_strategy) == 'keystone'

View File

@ -0,0 +1,185 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""A simple JSON RPC client.
This client is compatible with any JSON RPC 2.0 implementation, including ours.
"""
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import uuidutils
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import json_rpc
from ironic.common import keystone
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_SESSION = None
def _get_session():
global _SESSION
if _SESSION is None:
if json_rpc.require_authentication():
auth = keystone.get_auth('json_rpc')
else:
auth = None
_SESSION = keystone.get_session('json_rpc', auth=auth)
_SESSION.headers = {
'Content-Type': 'application/json'
}
return _SESSION
class Client(object):
"""JSON RPC client with ironic exception handling."""
def __init__(self, serializer, version_cap=None):
self.serializer = serializer
self.version_cap = version_cap
def can_send_version(self, version):
return _can_send_version(version, self.version_cap)
def prepare(self, topic, version=None):
host = topic.split('.', 1)[1]
return _CallContext(host, self.serializer, version=version,
version_cap=self.version_cap)
class _CallContext(object):
"""Wrapper object for compatibility with oslo.messaging API."""
def __init__(self, host, serializer, version=None, version_cap=None):
self.host = host
self.serializer = serializer
self.version = version
self.version_cap = version_cap
def _handle_error(self, error):
if not error:
return
message = error['message']
try:
cls = error['data']['class']
except KeyError:
LOG.error("Unexpected error from RPC: %s", error)
raise exception.IronicException(
_("Unexpected error raised by RPC"))
else:
if not cls.startswith('ironic.common.exception.'):
# NOTE(dtantsur): protect against arbitrary code execution
LOG.error("Unexpected error from RPC: %s", error)
raise exception.IronicException(
_("Unexpected error raised by RPC"))
raise importutils.import_object(cls, message,
code=error.get('code', 500))
def call(self, context, method, version=None, **kwargs):
"""Call conductor RPC.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: RPC result (if any).
"""
return self._request(context, method, cast=False, version=version,
**kwargs)
def cast(self, context, method, version=None, **kwargs):
"""Call conductor RPC asynchronously.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: None
"""
return self._request(context, method, cast=True, version=version,
**kwargs)
def _request(self, context, method, cast=False, version=None, **kwargs):
"""Call conductor RPC.
Versioned objects are automatically serialized and deserialized.
:param context: Security context.
:param method: Method name.
:param cast: If true, use a JSON RPC notification.
:param version: RPC API version to use.
:param kwargs: Keyword arguments to pass.
:return: RPC result (if any).
"""
params = {key: self.serializer.serialize_entity(context, value)
for key, value in kwargs.items()}
params['context'] = context.to_dict()
if version is None:
version = self.version
if version is not None:
_check_version(version, self.version_cap)
params['rpc.version'] = version
body = {
"jsonrpc": "2.0",
"method": method,
"params": params,
}
if not cast:
body['id'] = context.request_id or uuidutils.generate_uuid()
LOG.debug("RPC %s with %s", method, body)
url = 'http://%s:%d' % (self.host, CONF.json_rpc.port)
result = _get_session().post(url, json=body)
LOG.debug('RPC %s returned %s', method, result.text or '<None>')
if not cast:
result = result.json()
self._handle_error(result.get('error'))
result = self.serializer.deserialize_entity(context,
result['result'])
return result
def _can_send_version(requested, version_cap):
if requested is None or version_cap is None:
return True
requested_parts = [int(item) for item in requested.split('.', 1)]
version_cap_parts = [int(item) for item in version_cap.split('.', 1)]
if requested_parts[0] != version_cap_parts[0]:
return False # major version mismatch
else:
return requested_parts[1] <= version_cap_parts[1]
def _check_version(requested, version_cap):
if not _can_send_version(requested, version_cap):
raise RuntimeError(_("Cannot send RPC request: requested version "
"%(requested)s, maximum allowed version is "
"%(version_cap)s") % {'requested': requested,
'version_cap': version_cap})

View File

@ -0,0 +1,283 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of JSON RPC for communication between API and conductors.
This module implementa a subset of JSON RPC 2.0 as defined in
https://www.jsonrpc.org/specification. Main differences:
* No support for batched requests.
* No support for positional arguments passing.
* No JSON RPC 1.0 fallback.
"""
import json
from keystonemiddleware import auth_token
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_service import service
from oslo_service import wsgi
import webob
from ironic.common import context as ir_context
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import json_rpc
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_BLACK_LIST = {'init_host', 'del_host', 'target', 'iter_nodes'}
def _build_method_map(manager):
"""Build mapping from method names to their bodies.
:param manager: A conductor manager.
:return: dict with mapping
"""
result = {}
for method in dir(manager):
if method.startswith('_') or method in _BLACK_LIST:
continue
func = getattr(manager, method)
if not callable(func):
continue
LOG.debug('Adding RPC method %s', method)
result[method] = func
return result
class JsonRpcError(exception.IronicException):
pass
class ParseError(JsonRpcError):
code = -32700
_msg_fmt = _("Invalid JSON received by RPC server")
class InvalidRequest(JsonRpcError):
code = -32600
_msg_fmt = _("Invalid request object received by RPC server")
class MethodNotFound(JsonRpcError):
code = -32601
_msg_fmt = _("Method %(name)s was not found")
class InvalidParams(JsonRpcError):
code = -32602
_msg_fmt = _("Params %(params)s are invalid for %(method)s: %(error)s")
class WSGIService(service.Service):
"""Provides ability to launch JSON RPC as a WSGI application."""
def __init__(self, manager, serializer):
self.manager = manager
self.serializer = serializer
self._method_map = _build_method_map(manager)
if json_rpc.require_authentication():
conf = dict(CONF.keystone_authtoken)
app = auth_token.AuthProtocol(self._application, conf)
else:
app = self._application
self.server = wsgi.Server(CONF, 'ironic-json-rpc', app,
host=CONF.json_rpc.host_ip,
port=CONF.json_rpc.port,
use_ssl=CONF.json_rpc.use_ssl)
def _application(self, environment, start_response):
"""WSGI application for conductor JSON RPC."""
request = webob.Request(environment)
if request.method != 'POST':
body = {'error': {'code': 405,
'message': _('Only POST method can be used')}}
return webob.Response(status_code=405, json_body=body)(
environment, start_response)
if json_rpc.require_authentication():
roles = (request.headers.get('X-Roles') or '').split(',')
if 'admin' not in roles:
LOG.debug('Roles %s do not contain "admin", rejecting '
'request', roles)
body = {'error': {'code': 403, 'message': _('Forbidden')}}
return webob.Response(status_code=403, json_body=body)(
environment, start_response)
result = self._call(request)
if result is not None:
response = webob.Response(content_type='application/json',
charset='UTF-8',
json_body=result)
else:
response = webob.Response(status_code=204)
return response(environment, start_response)
def _handle_error(self, exc, request_id=None):
"""Generate a JSON RPC 2.0 error body.
:param exc: Exception object.
:param request_id: ID of the request (if any).
:return: dict with response body
"""
if isinstance(exc, oslo_messaging.ExpectedException):
exc = exc.exc_info[1]
expected = isinstance(exc, exception.IronicException)
cls = exc.__class__
if expected:
LOG.debug('RPC error %s: %s', cls.__name__, exc)
else:
LOG.exception('Unexpected RPC exception %s', cls.__name__)
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": getattr(exc, 'code', 500),
"message": str(exc),
}
}
if expected and not isinstance(exc, JsonRpcError):
# Allow de-serializing the correct class for expected errors.
response['error']['data'] = {
'class': '%s.%s' % (cls.__module__, cls.__name__)
}
return response
def _call(self, request):
"""Process a JSON RPC request.
:param request: ``webob.Request`` object.
:return: dict with response body.
"""
request_id = None
try:
try:
body = json.loads(request.text)
except ValueError:
LOG.error('Cannot parse JSON RPC request as JSON')
raise ParseError()
if not isinstance(body, dict):
LOG.error('JSON RPC request %s is not an object (batched '
'requests are not supported)', body)
raise InvalidRequest()
request_id = body.get('id')
params = body.get('params', {})
if (body.get('jsonrpc') != '2.0'
or not body.get('method')
or not isinstance(params, dict)):
LOG.error('JSON RPC request %s is invalid', body)
raise InvalidRequest()
except Exception as exc:
# We do not treat malformed requests as notifications and return
# a response even when request_id is None. This seems in agreement
# with the examples in the specification.
return self._handle_error(exc, request_id)
try:
method = body['method']
try:
func = self._method_map[method]
except KeyError:
raise MethodNotFound(name=method)
result = self._handle_requests(func, method, params)
if request_id is not None:
return {
"jsonrpc": "2.0",
"result": result,
"id": request_id
}
except Exception as exc:
result = self._handle_error(exc, request_id)
# We treat correctly formed requests without "id" as notifications
# and do not return any errors.
if request_id is not None:
return result
def _handle_requests(self, func, name, params):
"""Convert arguments and call a method.
:param func: Callable object.
:param name: RPC call name for logging.
:param params: Keyword arguments.
:return: call result as JSON.
"""
# TODO(dtantsur): server-side version check?
params.pop('rpc.version', None)
try:
context = params.pop('context')
except KeyError:
context = None
else:
# A valid context is required for deserialization
if not isinstance(context, dict):
raise InvalidParams(
_("Context must be a dictionary, if provided"))
context = ir_context.RequestContext.from_dict(context)
params = {key: self.serializer.deserialize_entity(context, value)
for key, value in params.items()}
params['context'] = context
LOG.debug('RPC %s with %s', name, params)
try:
result = func(**params)
# FIXME(dtantsur): we could use the inspect module, but
# oslo_messaging.expected_exceptions messes up signatures.
except TypeError as exc:
raise InvalidParams(params=', '.join(params),
method=name, error=exc)
if context is not None:
# Currently it seems that we can serialize even with invalid
# context, but I'm not sure it's guaranteed to be the case.
result = self.serializer.serialize_entity(context, result)
LOG.debug('RPC %s returned %s', name, result)
return result
def start(self):
"""Start serving this service using loaded configuration.
:returns: None
"""
self.server.start()
def stop(self):
"""Stop serving this API.
:returns: None
"""
self.server.stop()
def wait(self):
"""Wait for the service to stop serving this API.
:returns: None
"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default.
:returns: None
"""
self.server.reset()

View File

@ -16,16 +16,19 @@
import signal
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from oslo_service import service
from oslo_utils import importutils
from ironic.common import context
from ironic.common.json_rpc import server as json_rpc
from ironic.common import rpc
from ironic.objects import base as objects_base
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class RPCService(service.Service):
@ -44,10 +47,14 @@ class RPCService(service.Service):
super(RPCService, self).start()
admin_context = context.get_admin_context()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.IronicObjectSerializer(is_server=True)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
if CONF.rpc_transport == 'json-rpc':
self.rpcserver = json_rpc.WSGIService(self.manager,
serializer)
else:
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.handle_signal()

View File

@ -25,6 +25,7 @@ import oslo_messaging as messaging
from ironic.common import exception
from ironic.common import hash_ring
from ironic.common.i18n import _
from ironic.common.json_rpc import client as json_rpc
from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.conductor import manager
@ -112,14 +113,19 @@ class ConductorAPI(object):
if self.topic is None:
self.topic = manager.MANAGER_TOPIC
target = messaging.Target(topic=self.topic,
version='1.0')
serializer = objects_base.IronicObjectSerializer()
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
version_cap = (release_ver['rpc'] if release_ver
else self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
if CONF.rpc_transport == 'json-rpc':
self.client = json_rpc.Client(serializer=serializer,
version_cap=version_cap)
self.topic = ''
else:
target = messaging.Target(topic=self.topic, version='1.0')
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
use_groups = self.client.can_send_version('1.47')
# NOTE(deva): this is going to be buggy

View File

@ -35,6 +35,7 @@ from ironic.conf import inspector
from ironic.conf import ipmi
from ironic.conf import irmc
from ironic.conf import iscsi
from ironic.conf import json_rpc
from ironic.conf import metrics
from ironic.conf import metrics_statsd
from ironic.conf import neutron
@ -67,6 +68,7 @@ inspector.register_opts(CONF)
ipmi.register_opts(CONF)
irmc.register_opts(CONF)
iscsi.register_opts(CONF)
json_rpc.register_opts(CONF)
metrics.register_opts(CONF)
metrics_statsd.register_opts(CONF)
neutron.register_opts(CONF)

View File

@ -315,6 +315,12 @@ service_opts = [
'When doing a rolling upgrade from version N to version '
'N+1, set (to pin) this to N. To unpin (default), leave '
'it unset and the latest versions will be used.')),
cfg.StrOpt('rpc_transport',
default='oslo',
choices=[('oslo', _('use oslo.messaging transport')),
('json-rpc', _('use JSON RPC transport'))],
help=_('Which RPC transport implementation to use between '
'conductor and API services')),
]
utils_opts = [

44
ironic/conf/json_rpc.py Normal file
View File

@ -0,0 +1,44 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from ironic.common.i18n import _
from ironic.conf import auth
opts = [
cfg.StrOpt('auth_strategy',
choices=[('noauth', _('no authentication')),
('keystone', _('use the Identity service for '
'authentication'))],
help=_('Authentication strategy used by JSON RPC. Defaults to '
'the global auth_strategy setting.')),
cfg.HostAddressOpt('host_ip',
default='0.0.0.0',
help=_('The IP address or hostname on which JSON RPC '
'will listen.')),
cfg.PortOpt('port',
default=8089,
help=_('The port to use for JSON RPC')),
cfg.BoolOpt('use_ssl',
default=False,
help=_('Whether to use TLS for JSON RPC')),
]
def register_opts(conf):
conf.register_opts(opts, group='json_rpc')
auth.register_auth_opts(conf, 'json_rpc')
def list_opts():
return opts + auth.add_auth_opts([])

View File

@ -53,6 +53,7 @@ _opts = [
('ipmi', ironic.conf.ipmi.opts),
('irmc', ironic.conf.irmc.opts),
('iscsi', ironic.conf.iscsi.opts),
('json_rpc', ironic.conf.json_rpc.list_opts()),
('metrics', ironic.conf.metrics.opts),
('metrics_statsd', ironic.conf.metrics_statsd.opts),
('neutron', ironic.conf.neutron.list_opts()),

View File

@ -0,0 +1,495 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
import oslo_messaging
import webob
from ironic.common import context as ir_ctx
from ironic.common import exception
from ironic.common.json_rpc import client
from ironic.common.json_rpc import server
from ironic import objects
from ironic.objects import base as objects_base
from ironic.tests import base as test_base
from ironic.tests.unit.objects import utils as obj_utils
class FakeManager(object):
def success(self, context, x, y=0):
assert isinstance(context, ir_ctx.RequestContext)
assert context.user_name == 'admin'
return x - y
def with_node(self, context, node):
assert isinstance(context, ir_ctx.RequestContext)
assert isinstance(node, objects.Node)
node.extra['answer'] = 42
return node
def no_result(self, context):
assert isinstance(context, ir_ctx.RequestContext)
return None
def no_context(self):
return 42
def fail(self, context, message):
assert isinstance(context, ir_ctx.RequestContext)
raise exception.IronicException(message)
@oslo_messaging.expected_exceptions(exception.Invalid)
def expected(self, context, message):
assert isinstance(context, ir_ctx.RequestContext)
raise exception.Invalid(message)
def crash(self, context):
raise RuntimeError('boom')
def init_host(self, context):
assert False, "This should not be exposed"
def _private(self, context):
assert False, "This should not be exposed"
# This should not be exposed either
value = 42
class TestService(test_base.TestCase):
def setUp(self):
super(TestService, self).setUp()
self.config(auth_strategy='noauth', group='json_rpc')
self.server_mock = self.useFixture(fixtures.MockPatch(
'oslo_service.wsgi.Server', autospec=True)).mock
self.serializer = objects_base.IronicObjectSerializer(is_server=True)
self.service = server.WSGIService(FakeManager(), self.serializer)
self.app = self.service._application
self.ctx = {'user_name': 'admin'}
def _request(self, name=None, params=None, expected_error=None,
request_id='abcd', **kwargs):
body = {
'jsonrpc': '2.0',
}
if request_id is not None:
body['id'] = request_id
if name is not None:
body['method'] = name
if params is not None:
body['params'] = params
if 'json_body' not in kwargs:
kwargs['json_body'] = body
kwargs.setdefault('method', 'POST')
kwargs.setdefault('headers', {'Content-Type': 'application/json'})
request = webob.Request.blank("/", **kwargs)
response = request.get_response(self.app)
self.assertEqual(response.status_code,
expected_error or (200 if request_id else 204))
if request_id is not None:
if expected_error:
self.assertEqual(expected_error,
response.json_body['error']['code'])
else:
return response.json_body
else:
self.assertFalse(response.text)
def _check(self, body, result=None, error=None, request_id='abcd'):
self.assertEqual('2.0', body.pop('jsonrpc'))
self.assertEqual(request_id, body.pop('id'))
if error is not None:
self.assertEqual({'error': error}, body)
else:
self.assertEqual({'result': result}, body)
def test_success(self):
body = self._request('success', {'context': self.ctx, 'x': 42})
self._check(body, result=42)
def test_success_no_result(self):
body = self._request('no_result', {'context': self.ctx})
self._check(body, result=None)
def test_notification(self):
body = self._request('no_result', {'context': self.ctx},
request_id=None)
self.assertIsNone(body)
def test_no_context(self):
body = self._request('no_context')
self._check(body, result=42)
def test_serialize_objects(self):
node = obj_utils.get_test_node(self.context)
node = self.serializer.serialize_entity(self.context, node)
body = self._request('with_node', {'context': self.ctx, 'node': node})
self.assertNotIn('error', body)
self.assertIsInstance(body['result'], dict)
node = self.serializer.deserialize_entity(self.context, body['result'])
self.assertEqual({'answer': 42}, node.extra)
def test_non_json_body(self):
for body in (b'', b'???', b"\xc3\x28"):
request = webob.Request.blank("/", method='POST', body=body)
response = request.get_response(self.app)
self._check(
response.json_body,
error={
'message': server.ParseError._msg_fmt,
'code': -32700,
},
request_id=None)
def test_invalid_requests(self):
bodies = [
# Invalid requests with request ID.
{'method': 'no_result', 'id': 'abcd',
'params': {'context': self.ctx}},
{'jsonrpc': '2.0', 'id': 'abcd', 'params': {'context': self.ctx}},
# These do not count as notifications, since they're malformed.
{'method': 'no_result', 'params': {'context': self.ctx}},
{'jsonrpc': '2.0', 'params': {'context': self.ctx}},
42,
# We do not implement batched requests.
[],
[{'jsonrpc': '2.0', 'method': 'no_result',
'params': {'context': self.ctx}}],
]
for body in bodies:
body = self._request(json_body=body)
self._check(
body,
error={
'message': server.InvalidRequest._msg_fmt,
'code': -32600,
},
request_id=body.get('id'))
def test_malformed_context(self):
body = self._request(json_body={'jsonrpc': '2.0', 'id': 'abcd',
'method': 'no_result',
'params': {'context': 42}})
self._check(
body,
error={
'message': 'Context must be a dictionary, if provided',
'code': -32602,
})
def test_expected_failure(self):
body = self._request('fail', {'context': self.ctx,
'message': 'some error'})
self._check(body,
error={
'message': 'some error',
'code': 500,
'data': {
'class': 'ironic.common.exception.IronicException'
}
})
def test_expected_failure_oslo(self):
# Check that exceptions wrapped by oslo's expected_exceptions get
# unwrapped correctly.
body = self._request('expected', {'context': self.ctx,
'message': 'some error'})
self._check(body,
error={
'message': 'some error',
'code': 400,
'data': {
'class': 'ironic.common.exception.Invalid'
}
})
@mock.patch.object(server.LOG, 'exception', autospec=True)
def test_unexpected_failure(self, mock_log):
body = self._request('crash', {'context': self.ctx})
self._check(body,
error={
'message': 'boom',
'code': 500,
})
self.assertTrue(mock_log.called)
def test_method_not_found(self):
body = self._request('banana', {'context': self.ctx})
self._check(body,
error={
'message': 'Method banana was not found',
'code': -32601,
})
def test_no_blacklisted_methods(self):
for name in ('__init__', '_private', 'init_host', 'value'):
body = self._request(name, {'context': self.ctx})
self._check(body,
error={
'message': 'Method %s was not found' % name,
'code': -32601,
})
def test_missing_argument(self):
body = self._request('success', {'context': self.ctx})
# The exact error message depends on the Python version
self.assertEqual(-32602, body['error']['code'])
self.assertNotIn('result', body)
def test_method_not_post(self):
self._request('success', {'context': self.ctx, 'x': 42},
method='GET', expected_error=405)
def test_authenticated(self):
self.config(auth_strategy='keystone', group='json_rpc')
self.service = server.WSGIService(FakeManager(), self.serializer)
self.app = self.server_mock.call_args[0][2]
self._request('success', {'context': self.ctx, 'x': 42},
expected_error=401)
def test_authenticated_no_admin_role(self):
self.config(auth_strategy='keystone', group='json_rpc')
self._request('success', {'context': self.ctx, 'x': 42},
expected_error=403)
@mock.patch.object(client, '_get_session', autospec=True)
class TestClient(test_base.TestCase):
def setUp(self):
super(TestClient, self).setUp()
self.serializer = objects_base.IronicObjectSerializer(is_server=True)
self.client = client.Client(self.serializer)
self.ctx_json = self.context.to_dict()
def test_can_send_version(self, mock_session):
self.assertTrue(self.client.can_send_version('1.42'))
self.client = client.Client(self.serializer, version_cap='1.42')
self.assertTrue(self.client.can_send_version('1.42'))
self.assertTrue(self.client.can_send_version('1.0'))
self.assertFalse(self.client.can_send_version('1.99'))
self.assertFalse(self.client.can_send_version('2.0'))
def test_call_success(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_success_with_version(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'},
'id': self.context.request_id})
def test_call_success_with_version_and_cap(self, mock_session):
self.client = client.Client(self.serializer, version_cap='1.99')
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': 42
}
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', answer=42)
self.assertEqual(42, result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'},
'id': self.context.request_id})
def test_cast_success(self, mock_session):
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json}})
def test_cast_success_with_version(self, mock_session):
cctx = self.client.prepare('foo.example.com', version='1.42')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json,
'rpc.version': '1.42'}})
def test_call_serialization(self, mock_session):
node = obj_utils.get_test_node(self.context)
node_json = self.serializer.serialize_entity(self.context, node)
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'result': node_json
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.call(self.context, 'do_something', node=node)
self.assertIsInstance(result, objects.Node)
self.assertEqual(result.uuid, node.uuid)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'node': node_json, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_failure(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 418,
'message': 'I am a teapot',
'data': {
'class': 'ironic.common.exception.Invalid'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
# Make sure that the class is restored correctly for expected errors.
exc = self.assertRaises(exception.Invalid,
cctx.call,
self.context, 'do_something', answer=42)
# Code from the body has priority over one in the class.
self.assertEqual(418, exc.code)
self.assertIn('I am a teapot', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_unexpected_failure(self, mock_session):
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 500,
'message': 'AttributeError',
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
exc = self.assertRaises(exception.IronicException,
cctx.call,
self.context, 'do_something', answer=42)
self.assertEqual(500, exc.code)
self.assertIn('Unexpected error', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_call_failure_with_foreign_class(self, mock_session):
# This should not happen, but provide an additional safeguard
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 500,
'message': 'AttributeError',
'data': {
'class': 'AttributeError'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
exc = self.assertRaises(exception.IronicException,
cctx.call,
self.context, 'do_something', answer=42)
self.assertEqual(500, exc.code)
self.assertIn('Unexpected error', str(exc))
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json},
'id': self.context.request_id})
def test_cast_failure(self, mock_session):
# Cast cannot return normal failures, but make sure we ignore them even
# if server sends something in violation of the protocol (or because
# it's a low-level error like HTTP Forbidden).
response = mock_session.return_value.post.return_value
response.json.return_value = {
'jsonrpc': '2.0',
'error': {
'code': 418,
'message': 'I am a teapot',
'data': {
'class': 'ironic.common.exception.IronicException'
}
}
}
cctx = self.client.prepare('foo.example.com')
self.assertEqual('example.com', cctx.host)
result = cctx.cast(self.context, 'do_something', answer=42)
self.assertIsNone(result)
mock_session.return_value.post.assert_called_once_with(
'http://example.com:8089',
json={'jsonrpc': '2.0',
'method': 'do_something',
'params': {'answer': 42, 'context': self.ctx_json}})
def test_call_failure_with_version_and_cap(self, mock_session):
self.client = client.Client(self.serializer, version_cap='1.42')
cctx = self.client.prepare('foo.example.com', version='1.99')
self.assertRaisesRegex(RuntimeError,
"requested version 1.99, maximum allowed "
"version is 1.42",
cctx.call, self.context, 'do_something',
answer=42)
self.assertFalse(mock_session.return_value.post.called)

View File

@ -0,0 +1,8 @@
---
features:
- |
Adds the ability to use JSON RPC for communication between API and
conductor services. To use it set the new ``rpc_transport`` configuration
options to ``json-rpc`` and configure the credentials and the ``host_ip``
in the ``json_rpc`` section. Hostnames of all conductors must be
resolvable for this implementation to work.

View File

@ -92,6 +92,7 @@
IRONIC_AUTOMATED_CLEAN_ENABLED: False
IRONIC_DEFAULT_DEPLOY_INTERFACE: direct
IRONIC_ENABLED_DEPLOY_INTERFACES: "iscsi,direct,ansible"
IRONIC_RPC_TRANSPORT: json-rpc
IRONIC_VM_COUNT: 6
SWIFT_ENABLE_TEMPURLS: True
SWIFT_TEMPURL_KEY: secretkey
@ -269,7 +270,9 @@
IRONIC_DEFAULT_NETWORK_INTERFACE: noop
IRONIC_TEMPEST_WHOLE_DISK_IMAGE: True
IRONIC_VM_EPHEMERAL_DISK: 0
IRONIC_RPC_TRANSPORT: json-rpc
devstack_services:
rabbit: False
g-api: False
g-reg: False
n-api: False