From f664668cf041ba5eddcb4fc158019cb080dfbe24 Mon Sep 17 00:00:00 2001 From: lawrancejing Date: Mon, 9 Nov 2015 07:10:34 +0000 Subject: [PATCH] Refactor evoque engine see oslo usage and magnum Change-Id: I0d78e2ef4fc028cd102fe344000572ab07324ec0 --- evoque/api/__init__.py | 9 +- evoque/api/app.py | 6 + evoque/api/hooks.py | 71 +++++++ evoque/cmd/api.py | 4 +- evoque/cmd/engine.py | 73 +++---- evoque/common/consts.py | 85 -------- evoque/common/context.py | 124 +++++------- evoque/common/messaging.py | 107 ---------- evoque/common/rpc.py | 128 ++++++++++++ evoque/common/rpc_service.py | 83 ++++++++ evoque/db/sqlalchemy/api.py | 8 +- evoque/engine/dispatcher.py | 97 ---------- evoque/engine/service.py | 226 ---------------------- evoque/{rpc => engine/ticket}/__init__.py | 0 evoque/engine/ticket/api.py | 22 +++ evoque/engine/ticket/endpoint.py | 24 +++ evoque/opts.py | 15 +- evoque/rpc/client.py | 72 ------- evoque/service.py | 30 ++- requirements.txt | 1 + 20 files changed, 440 insertions(+), 745 deletions(-) create mode 100644 evoque/api/hooks.py delete mode 100644 evoque/common/consts.py delete mode 100644 evoque/common/messaging.py create mode 100644 evoque/common/rpc.py create mode 100644 evoque/common/rpc_service.py delete mode 100644 evoque/engine/dispatcher.py delete mode 100644 evoque/engine/service.py rename evoque/{rpc => engine/ticket}/__init__.py (100%) create mode 100644 evoque/engine/ticket/api.py create mode 100644 evoque/engine/ticket/endpoint.py delete mode 100644 evoque/rpc/client.py diff --git a/evoque/api/__init__.py b/evoque/api/__init__.py index 9ddcdbd..b0c103a 100644 --- a/evoque/api/__init__.py +++ b/evoque/api/__init__.py @@ -15,24 +15,19 @@ from pecan import rest from oslo_log import log -from evoque.rpc import client as rpc_client - LOG = log.getLogger(__name__) class TicketController(rest.RestController): - def __init__(self): - self.rpc_client = rpc_client.EngineClient() - @pecan.expose('json') def get(self): return {"version": "1.0.0"} @pecan.expose('json') def post(self, **kwargs): - ticket = self.rpc_client.ticket_create( - {"fake": 'fake'}, kwargs['name']) + ticket = pecan.request.rpcapi.ticket_create( + name=kwargs['name']) return ticket diff --git a/evoque/api/app.py b/evoque/api/app.py index d8d8697..8095414 100644 --- a/evoque/api/app.py +++ b/evoque/api/app.py @@ -15,6 +15,7 @@ import pecan import webob.exc from werkzeug import serving +from evoque.api import hooks from evoque import exceptions from evoque import service @@ -25,6 +26,10 @@ PECAN_CONFIG = { 'app': { 'root': 'evoque.api.RootController', 'modules': ['evoque.api'], + 'hooks': [ + hooks.ContextHook(), + hooks.RPCHook(), + ], }, } @@ -56,6 +61,7 @@ def setup_app(config=PECAN_CONFIG, cfg=None): app = pecan.make_app( config['app']['root'], debug=pecan_debug, + hooks=config['app']['hooks'], guess_content_type_from_ext=False, ) diff --git a/evoque/api/hooks.py b/evoque/api/hooks.py new file mode 100644 index 0000000..08a20a2 --- /dev/null +++ b/evoque/api/hooks.py @@ -0,0 +1,71 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from pecan import hooks + +from evoque.common import context +from evoque.engine.ticket import api as ticket_api + + +class ContextHook(hooks.PecanHook): + """Configures a request context and attaches it to the request. + + The following HTTP request headers are used: + + X-User-Name: + Used for context.user_name. + + X-User-Id: + Used for context.user_id. + + X-Project-Name: + Used for context.project. + + X-Project-Id: + Used for context.project_id. + + X-Auth-Token: + Used for context.auth_token. + + X-Roles: + Used for context.roles. + """ + + def before(self, state): + headers = state.request.headers + user_name = headers.get('X-User-Name') + user_id = headers.get('X-User-Id') + project = headers.get('X-Project-Name') + project_id = headers.get('X-Project-Id') + domain_id = headers.get('X-User-Domain-Id') + domain_name = headers.get('X-User-Domain-Name') + auth_token = headers.get('X-Auth-Token') + roles = headers.get('X-Roles', '').split(',') + auth_token_info = state.request.environ.get('keystone.token_info') + + state.request.context = context.make_context( + auth_token=auth_token, + auth_token_info=auth_token_info, + user_name=user_name, + user_id=user_id, + project_name=project, + project_id=project_id, + domain_id=domain_id, + domain_name=domain_name, + roles=roles) + + +class RPCHook(hooks.PecanHook): + """Attach the rpcapi object to the request so controllers can get to it.""" + + def before(self, state): + state.request.rpcapi = ticket_api.API(context=state.request.context) diff --git a/evoque/cmd/api.py b/evoque/cmd/api.py index 5a49e65..9e565a2 100644 --- a/evoque/cmd/api.py +++ b/evoque/cmd/api.py @@ -11,10 +11,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from evoque.common import messaging +from evoque.api import app def main(): - messaging.setup() - from evoque.api import app app.build_server() diff --git a/evoque/cmd/engine.py b/evoque/cmd/engine.py index 82609dd..0954615 100644 --- a/evoque/cmd/engine.py +++ b/evoque/cmd/engine.py @@ -1,68 +1,45 @@ -#!/usr/bin/env python -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. -""" -Evoque Engine Server. -""" - -import eventlet -eventlet.monkey_patch() +"""Evoque engine service.""" import os -import sys - -# If ../evoque/__init__.py exists, add ../ to Python search path, so that -# it will override what happens to be installed in /usr/(local/)lib/python... -POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'evoque', '__init__.py')): - sys.path.insert(0, POSSIBLE_TOPDIR) +import uuid from oslo_config import cfg -from oslo_i18n import _lazy from oslo_log import log as logging from oslo_service import service -from evoque.common import consts -from evoque.common import messaging -from evoque import opts +from evoque.common import rpc_service +from evoque.engine.ticket import endpoint as ticket_endpoint +from evoque.i18n import _LI +from evoque import service as evoque_service -_lazy.enable_lazy() - -LOG = logging.getLogger('evoque.engine') +LOG = logging.getLogger(__name__) def main(): - conf = cfg.ConfigOpts() + evoque_service.prepare_service() - # Register Evoque options - for group, options in opts.list_opts(): - conf.register_opts(list(options), - group=None if group == "DEFAULT" else group) + LOG.info(_LI('Starting evoque engine in PID %s') % os.getpid()) - logging.register_options(cfg.CONF) - cfg.CONF(project='evoque', prog='evoque-engine') - logging.setup(cfg.CONF, 'evoque-engine') - logging.set_defaults() - messaging.setup() + conductor_id = str(uuid.uuid4()) - from evoque.engine import service as engine + endpoints = [ + ticket_endpoint.Handler(), + ] - srv = engine.EngineService(conf.host, consts.ENGINE_TOPIC) - launcher = service.launch(cfg.CONF, srv, - workers=1) - # the following periodic tasks are intended serve as HA checking - # srv.create_periodic_tasks() + server = rpc_service.Service.create("evoque-engine", + conductor_id, endpoints, + binary='evoque-engine') + launcher = service.launch(cfg.CONF, server) launcher.wait() diff --git a/evoque/common/consts.py b/evoque/common/consts.py deleted file mode 100644 index 5af83cb..0000000 --- a/evoque/common/consts.py +++ /dev/null @@ -1,85 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -RPC_ATTRS = ( - ENGINE_TOPIC, - ENGINE_DISPATCHER_TOPIC, - ENGINE_HEALTH_MGR_TOPIC, - RPC_API_VERSION, -) = ( - 'evoque-engine', - 'engine-dispatcher', - 'engine-health_mgr', - '1.0', -) - -RPC_PARAMS = ( - PARAM_SHOW_DELETED, PARAM_SHOW_NESTED, PARAM_LIMIT, PARAM_MARKER, - PARAM_GLOBAL_PROJECT, PARAM_SHOW_DETAILS, - PARAM_SORT_DIR, PARAM_SORT_KEYS, -) = ( - 'show_deleted', 'show_nested', 'limit', 'marker', - 'global_project', 'show_details', - 'sort_dir', 'sort_keys', -) - -ACTION_NAMES = ( - CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE, - CLUSTER_ADD_NODES, CLUSTER_DEL_NODES, CLUSTER_RESIZE, - CLUSTER_SCALE_OUT, CLUSTER_SCALE_IN, - CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY, CLUSTER_UPDATE_POLICY, - - NODE_CREATE, NODE_DELETE, NODE_UPDATE, - NODE_JOIN, NODE_LEAVE, - - POLICY_ENABLE, POLICY_DISABLE, POLICY_UPDATE, -) = ( - 'CLUSTER_CREATE', 'CLUSTER_DELETE', 'CLUSTER_UPDATE', - 'CLUSTER_ADD_NODES', 'CLUSTER_DEL_NODES', 'CLUSTER_RESIZE', - 'CLUSTER_SCALE_OUT', 'CLUSTER_SCALE_IN', - 'CLUSTER_ATTACH_POLICY', 'CLUSTER_DETACH_POLICY', 'CLUSTER_UPDATE_POLICY', - - 'NODE_CREATE', 'NODE_DELETE', 'NODE_UPDATE', - 'NODE_JOIN', 'NODE_LEAVE', - - 'POLICY_ENABLE', 'POLICY_DISABLE', 'POLICY_UPDATE', -) - -ADJUSTMENT_PARAMS = ( - ADJUSTMENT_TYPE, ADJUSTMENT_NUMBER, ADJUSTMENT_MIN_STEP, - ADJUSTMENT_MIN_SIZE, ADJUSTMENT_MAX_SIZE, ADJUSTMENT_STRICT, -) = ( - 'adjustment_type', 'number', 'min_step', - 'min_size', 'max_size', 'strict', -) - -ADJUSTMENT_TYPES = ( - EXACT_CAPACITY, CHANGE_IN_CAPACITY, CHANGE_IN_PERCENTAGE, -) = ( - 'EXACT_CAPACITY', 'CHANGE_IN_CAPACITY', 'CHANGE_IN_PERCENTAGE', -) - -CLUSTER_ATTRS = ( - CLUSTER_NAME, CLUSTER_PROFILE, CLUSTER_DESIRED_CAPACITY, - CLUSTER_MIN_SIZE, CLUSTER_MAX_SIZE, CLUSTER_ID, CLUSTER_PARENT, - CLUSTER_DOMAIN, CLUSTER_PROJECT, CLUSTER_USER, - CLUSTER_CREATED_TIME, CLUSTER_UPDATED_TIME, CLUSTER_DELETED_TIME, - CLUSTER_STATUS, CLUSTER_STATUS_REASON, CLUSTER_TIMEOUT, - CLUSTER_METADATA, -) = ( - 'name', 'profile_id', 'desired_capacity', - 'min_size', 'max_size', 'id', 'parent', - 'domain', 'project', 'user', - 'created_time', 'updated_time', 'deleted_time', - 'status', 'status_reason', 'timeout', - 'metadata', -) diff --git a/evoque/common/context.py b/evoque/common/context.py index 155cf94..af14217 100644 --- a/evoque/common/context.py +++ b/evoque/common/context.py @@ -2,7 +2,7 @@ # not use this file except in compliance with the License. You may obtain # a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT @@ -10,92 +10,68 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_context import context as base_context -from oslo_utils import encodeutils - -from evoque.db import api as db_api +from oslo_context import context -class RequestContext(base_context.RequestContext): - '''Stores information about the security context. +class RequestContext(context.RequestContext): + """Extends security contexts from the OpenStack common library.""" - The context encapsulates information related to the user accessing the - the system, as well as additional request information. - ''' + def __init__(self, auth_token=None, auth_url=None, domain_id=None, + domain_name=None, user_name=None, user_id=None, + project_name=None, project_id=None, roles=None, + is_admin=False, read_only=False, show_deleted=False, + request_id=None, trust_id=None, auth_token_info=None, + all_tenants=False, **kwargs): + """Stores several additional request parameters: - def __init__(self, auth_token=None, user=None, project=None, - domain=None, user_domain=None, project_domain=None, - is_admin=None, read_only=False, show_deleted=False, - request_id=None, auth_url=None, trusts=None, - user_name=None, project_name=None, domain_name=None, - user_domain_name=None, project_domain_name=None, - auth_token_info=None, region_name=None, roles=None, - password=None, **kwargs): - - '''Initializer of request context.''' - # We still have 'tenant' param because oslo_context still use it. - super(RequestContext, self).__init__( - auth_token=auth_token, user=user, tenant=project, - domain=domain, user_domain=user_domain, - project_domain=project_domain, - read_only=read_only, show_deleted=show_deleted, - request_id=request_id) - - # request_id might be a byte array - self.request_id = encodeutils.safe_decode(self.request_id) - - # we save an additional 'project' internally for use - self.project = project - - # Session for DB access - self._session = None - - self.auth_url = auth_url - self.trusts = trusts + :param domain_id: The ID of the domain. + :param domain_name: The name of the domain. + """ self.user_name = user_name + self.user_id = user_id self.project_name = project_name + self.project_id = project_id + self.domain_id = domain_id self.domain_name = domain_name - self.user_domain_name = user_domain_name - self.project_domain_name = project_domain_name - + self.roles = roles + self.auth_url = auth_url self.auth_token_info = auth_token_info - self.region_name = region_name - self.roles = roles or [] - self.password = password + self.trust_id = trust_id + self.all_tenants = all_tenants - self.is_admin = is_admin - - @property - def session(self): - if self._session is None: - self._session = db_api.get_session() - return self._session + super(RequestContext, self).__init__(auth_token=auth_token, + user=user_name, + tenant=project_name, + is_admin=is_admin, + read_only=read_only, + show_deleted=show_deleted, + request_id=request_id) def to_dict(self): - return { - 'auth_url': self.auth_url, - 'auth_token': self.auth_token, - 'auth_token_info': self.auth_token_info, - 'user': self.user, - 'user_name': self.user_name, - 'user_domain': self.user_domain, - 'user_domain_name': self.user_domain_name, - 'project': self.project, - 'project_name': self.project_name, - 'project_domain': self.project_domain, - 'project_domain_name': self.project_domain_name, - 'domain': self.domain, - 'domain_name': self.domain_name, - 'trusts': self.trusts, - 'region_name': self.region_name, - 'roles': self.roles, - 'show_deleted': self.show_deleted, - 'is_admin': self.is_admin, - 'request_id': self.request_id, - 'password': self.password, - } + value = super(RequestContext, self).to_dict() + value.update({'auth_token': self.auth_token, + 'auth_url': self.auth_url, + 'domain_id': self.domain_id, + 'domain_name': self.domain_name, + 'user_name': self.user_name, + 'user_id': self.user_id, + 'project_name': self.project_name, + 'project_id': self.project_id, + 'is_admin': self.is_admin, + 'read_only': self.read_only, + 'roles': self.roles, + 'show_deleted': self.show_deleted, + 'request_id': self.request_id, + 'trust_id': self.trust_id, + 'auth_token_info': self.auth_token_info, + 'all_tenants': self.all_tenants}) + return value @classmethod def from_dict(cls, values): return cls(**values) + + +def make_context(*args, **kwargs): + return RequestContext(*args, **kwargs) diff --git a/evoque/common/messaging.py b/evoque/common/messaging.py deleted file mode 100644 index 3990c9d..0000000 --- a/evoque/common/messaging.py +++ /dev/null @@ -1,107 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet - -from oslo_config import cfg -import oslo_messaging -from oslo_serialization import jsonutils - -from evoque.common import context - -TRANSPORT = None -NOTIFIER = None - - -class RequestContextSerializer(oslo_messaging.Serializer): - def __init__(self, base): - self._base = base - - def serialize_entity(self, ctxt, entity): - if not self._base: - return entity - return self._base.serialize_entity(ctxt, entity) - - def deserialize_entity(self, ctxt, entity): - if not self._base: - return entity - return self._base.deserialize_entity(ctxt, entity) - - @staticmethod - def serialize_context(ctxt): - return {} - - @staticmethod - def deserialize_context(ctxt): - return context.RequestContext.from_dict(ctxt) - - -class JsonPayloadSerializer(oslo_messaging.NoOpSerializer): - @classmethod - def serialize_entity(cls, context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - -def setup(url=None, optional=False): - """Initialise the oslo_messaging layer.""" - global TRANSPORT, NOTIFIER - - if url and url.startswith("fake://"): - # NOTE(sileht): oslo_messaging fake driver uses time.sleep - # for task switch, so we need to monkey_patch it - eventlet.monkey_patch(time=True) - - if not TRANSPORT: - oslo_messaging.set_transport_defaults('evoque') - exmods = ['evoque.exception'] - try: - TRANSPORT = oslo_messaging.get_transport( - cfg.CONF, url, allowed_remote_exmods=exmods) - except oslo_messaging.InvalidTransportURL as e: - TRANSPORT = None - if not optional or e.url: - # NOTE(sileht): oslo_messaging is configured but unloadable - # so reraise the exception - raise - - if not NOTIFIER and TRANSPORT: - serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer) - - -def cleanup(): - """Cleanup the oslo_messaging layer.""" - global TRANSPORT, NOTIFIER - if TRANSPORT: - TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None - - -def get_rpc_server(target, endpoint): - """Return a configured oslo_messaging rpc server.""" - serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint], - executor='eventlet', - serializer=serializer) - - -def get_rpc_client(**kwargs): - """Return a configured oslo_messaging RPCClient.""" - target = oslo_messaging.Target(**kwargs) - serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo_messaging.RPCClient(TRANSPORT, target, - serializer=serializer) - - -def get_notifier(publisher_id): - """Return a configured oslo_messaging notifier.""" - return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/evoque/common/rpc.py b/evoque/common/rpc.py new file mode 100644 index 0000000..927920b --- /dev/null +++ b/evoque/common/rpc.py @@ -0,0 +1,128 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__all__ = [ + 'init', + 'cleanup', + 'set_defaults', + 'add_extra_exmods', + 'clear_extra_exmods', + 'get_allowed_exmods', + 'RequestContextSerializer', + 'get_client', + 'get_server', + 'get_notifier', +] + +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_serialization import jsonutils + +from evoque.common import context as evoque_context +from evoque import exceptions + + +CONF = cfg.CONF +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exceptions.__name__, +] +EXTRA_EXMODS = [] + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def set_defaults(control_exchange): + messaging.set_transport_defaults(control_exchange) + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +class JsonPayloadSerializer(messaging.NoOpSerializer): + @staticmethod + def serialize_entity(context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + +class RequestContextSerializer(messaging.Serializer): + + def __init__(self, base): + self._base = base + + def serialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.serialize_entity(context, entity) + + def deserialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.deserialize_entity(context, entity) + + def serialize_context(self, context): + return context.to_dict() + + def deserialize_context(self, context): + return evoque_context.RequestContext.from_dict(context) + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service='container', host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/evoque/common/rpc_service.py b/evoque/common/rpc_service.py new file mode 100644 index 0000000..811253e --- /dev/null +++ b/evoque/common/rpc_service.py @@ -0,0 +1,83 @@ +# Copyright 2014 - Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common RPC service and API tools for Magnum.""" + +import eventlet +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_service import service + +from evoque.common import rpc + +# NOTE(paulczar): +# Ubuntu 14.04 forces librabbitmq when kombu is used +# Unfortunately it forces a version that has a crash +# bug. Calling eventlet.monkey_patch() tells kombu +# to use libamqp instead. +eventlet.monkey_patch() + +CONF = cfg.CONF + + +class Service(service.Service): + + def __init__(self, topic, server, handlers, binary): + super(Service, self).__init__() + serializer = rpc.RequestContextSerializer( + rpc.JsonPayloadSerializer()) + transport = messaging.get_transport(cfg.CONF) + # TODO(asalkeld) add support for version='x.y' + target = messaging.Target(topic=topic, server=server) + self._server = messaging.get_rpc_server(transport, target, handlers, + serializer=serializer) + self.binary = binary + + def start(self): + self._server.start() + + def wait(self): + self._server.wait() + + @classmethod + def create(cls, topic, server, handlers, binary): + service_obj = cls(topic, server, handlers, binary) + return service_obj + + +class API(object): + def __init__(self, transport=None, context=None, topic=None, server=None, + timeout=None): + serializer = rpc.RequestContextSerializer( + rpc.JsonPayloadSerializer()) + if transport is None: + exmods = rpc.get_allowed_exmods() + transport = messaging.get_transport(cfg.CONF, + allowed_remote_exmods=exmods) + self._context = context + if topic is None: + topic = '' + target = messaging.Target(topic=topic, server=server) + self._client = messaging.RPCClient(transport, target, + serializer=serializer, + timeout=timeout) + + def _call(self, method, *args, **kwargs): + return self._client.call(self._context, method, *args, **kwargs) + + def _cast(self, method, *args, **kwargs): + self._client.cast(self._context, method, *args, **kwargs) + + def echo(self, message): + self._cast('echo', message=message) diff --git a/evoque/db/sqlalchemy/api.py b/evoque/db/sqlalchemy/api.py index 64f7a7e..35e4c2c 100644 --- a/evoque/db/sqlalchemy/api.py +++ b/evoque/db/sqlalchemy/api.py @@ -44,20 +44,20 @@ def get_backend(): def model_query(context, *args): - session = _session(context) + session = _session() query = session.query(*args) return query -def _session(context): - return (context and context.session) or get_session() +def _session(): + return get_session() # Tickets def ticket_create(context, values): ticket_ref = models.Ticket() ticket_ref.update(values) - ticket_ref.save(_session(context)) + ticket_ref.save(_session()) return ticket_ref diff --git a/evoque/engine/dispatcher.py b/evoque/engine/dispatcher.py deleted file mode 100644 index d4b3ef9..0000000 --- a/evoque/engine/dispatcher.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_context import context as oslo_context -from oslo_log import log as logging -import oslo_messaging -from oslo_service import service - -from evoque.common import consts -from evoque.common import messaging as rpc_messaging -from evoque.i18n import _LI - -LOG = logging.getLogger(__name__) - -OPERATIONS = ( - START_ACTION, CANCEL_ACTION, STOP -) = ( - 'start_action', 'cancel_action', 'stop' -) - - -class Dispatcher(service.Service): - '''Listen on an AMQP queue named for the engine. - - Receive notification from engine services and schedule actions. - ''' - def __init__(self, engine_service, topic, version, thread_group_mgr): - super(Dispatcher, self).__init__() - self.TG = thread_group_mgr - self.engine_id = engine_service.engine_id - self.topic = topic - self.version = version - - def start(self): - super(Dispatcher, self).start() - self.target = oslo_messaging.Target(server=self.engine_id, - topic=self.topic, - version=self.version) - server = rpc_messaging.get_rpc_server(self.target, self) - server.start() - - def listening(self, ctxt): - '''Respond affirmatively to confirm that engine is still alive.''' - return True - - def stop(self): - super(Dispatcher, self).stop() - # Wait for all action threads to be finished - LOG.info(_LI("Stopping all action threads of engine %s"), - self.engine_id) - # Stop ThreadGroup gracefully - self.TG.stop(True) - LOG.info(_LI("All action threads have been finished")) - - -def notify(method, engine_id=None, **kwargs): - '''Send notification to dispatcher - - :param method: remote method to call - :param engine_id: dispatcher to notify; None implies broadcast - ''' - - client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION) - - if engine_id: - # Notify specific dispatcher identified by engine_id - call_context = client.prepare( - version=consts.RPC_API_VERSION, - topic=consts.ENGINE_DISPATCHER_TOPIC, - server=engine_id) - else: - # Broadcast to all disptachers - call_context = client.prepare( - version=consts.RPC_API_VERSION, - topic=consts.ENGINE_DISPATCHER_TOPIC) - - try: - # We don't use ctext parameter in action progress - # actually. But since RPCClient.call needs this param, - # we use oslo current context here. - call_context.call(oslo_context.get_current(), method, **kwargs) - return True - except oslo_messaging.MessagingTimeout: - return False - - -def start_action(engine_id=None, **kwargs): - return notify(START_ACTION, engine_id, **kwargs) diff --git a/evoque/engine/service.py b/evoque/engine/service.py deleted file mode 100644 index acb91ce..0000000 --- a/evoque/engine/service.py +++ /dev/null @@ -1,226 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import uuid - -import eventlet -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging -from oslo_service import service -from oslo_service import threadgroup -from osprofiler import profiler -import six - -from evoque.common import consts -from evoque.common import messaging as rpc_messaging -from evoque.db import api as db_api -from evoque.engine import dispatcher -from evoque.i18n import _LE -from evoque.i18n import _LI - -LOG = logging.getLogger(__name__) - -CONF = cfg.CONF - - -class ThreadGroupManager(object): - - def __init__(self): - super(ThreadGroupManager, self).__init__() - self.groups = {} - self.events = collections.defaultdict(list) - - # Create dummy service task, because when there is nothing queued - # on self.tg the process exits - self.add_timer(60, self._service_task) - - def _service_task(self): - """ - This is a dummy task which gets queued on the service.Service - threadgroup. Without this service.Service sees nothing running - i.e has nothing to wait() on, so the process exits.. - This could also be used to trigger periodic non-stack-specific - housekeeping tasks - """ - pass - - def _serialize_profile_info(self): - prof = profiler.get() - trace_info = None - if prof: - trace_info = { - "hmac_key": prof.hmac_key, - "base_id": prof.get_base_id(), - "parent_id": prof.get_id() - } - return trace_info - - def _start_with_trace(self, trace, func, *args, **kwargs): - if trace: - profiler.init(**trace) - return func(*args, **kwargs) - - def start(self, stack_id, func, *args, **kwargs): - """ - Run the given method in a sub-thread. - """ - if stack_id not in self.groups: - self.groups[stack_id] = threadgroup.ThreadGroup() - return self.groups[stack_id].add_thread(self._start_with_trace, - self._serialize_profile_info(), - func, *args, **kwargs) - - def start_with_acquired_lock(self, stack, lock, func, *args, **kwargs): - """ - Run the given method in a sub-thread and release the provided lock - when the thread finishes. - - :param stack: Stack to be operated on - :type stack: heat.engine.parser.Stack - :param lock: The acquired stack lock - :type lock: heat.engine.stack_lock.StackLock - :param func: Callable to be invoked in sub-thread - :type func: function or instancemethod - :param args: Args to be passed to func - :param kwargs: Keyword-args to be passed to func - - """ - def release(gt): - """ - Callback function that will be passed to GreenThread.link(). - """ - lock.release() - - th = self.start(stack.id, func, *args, **kwargs) - th.link(release) - return th - - def add_timer(self, stack_id, func, *args, **kwargs): - """ - Define a periodic task, to be run in a separate thread, in the stack - threadgroups. Periodicity is cfg.CONF.periodic_interval - """ - if stack_id not in self.groups: - self.groups[stack_id] = threadgroup.ThreadGroup() - self.groups[stack_id].add_timer(60, - func, *args, **kwargs) - - def add_event(self, stack_id, event): - self.events[stack_id].append(event) - - def remove_event(self, gt, stack_id, event): - for e in self.events.pop(stack_id, []): - if e is not event: - self.add_event(stack_id, e) - - def stop_timers(self, stack_id): - if stack_id in self.groups: - self.groups[stack_id].stop_timers() - - def stop(self, stack_id, graceful=False): - '''Stop any active threads on a stack.''' - if stack_id in self.groups: - self.events.pop(stack_id, None) - threadgroup = self.groups.pop(stack_id) - threads = threadgroup.threads[:] - - threadgroup.stop(graceful) - threadgroup.wait() - - # Wait for link()ed functions (i.e. lock release) - links_done = dict((th, False) for th in threads) - - def mark_done(gt, th): - links_done[th] = True - - for th in threads: - th.link(mark_done, th) - while not all(six.itervalues(links_done)): - eventlet.sleep() - - def send(self, stack_id, message): - for event in self.events.pop(stack_id, []): - event.send(message) - - -class EngineService(service.Service): - '''Lifecycle manager for a running service engine. - - - All the methods in here are called from the RPC client. - - If a RPC call does not have a corresponding method here, an exception - will be thrown. - - Arguments to these calls are added dynamically and will be treated as - keyword arguments by the RPC client. - ''' - - def __init__(self, host, topic, manager=None): - - super(EngineService, self).__init__() - self.host = host - self.topic = topic - self.dispatcher_topic = consts.ENGINE_DISPATCHER_TOPIC - self.health_mgr_topic = consts.ENGINE_HEALTH_MGR_TOPIC - - # The following are initialized here, but assigned in start() which - # happens after the fork when spawning multiple worker processes - self.engine_id = None - self.TG = None - self.target = None - self._rpc_server = None - - def start(self): - self.engine_id = str(uuid.uuid4()) - self.TG = ThreadGroupManager() - - # create a dispatcher greenthread for this engine. - self.dispatcher = dispatcher.Dispatcher(self, - self.dispatcher_topic, - consts.RPC_API_VERSION, - self.TG) - LOG.info(_LI("Starting dispatcher for engine %s"), self.engine_id) - self.dispatcher.start() - - target = oslo_messaging.Target(version=consts.RPC_API_VERSION, - server=self.host, - topic=self.topic) - self.target = target - self._rpc_server = rpc_messaging.get_rpc_server(target, self) - self._rpc_server.start() - super(EngineService, self).start() - - def _stop_rpc_server(self): - # Stop RPC connection to prevent new requests - LOG.info(_LI("Stopping engine service...")) - try: - self._rpc_server.stop() - self._rpc_server.wait() - LOG.info(_LI('Engine service stopped successfully')) - except Exception as ex: - LOG.error(_LE('Failed to stop engine service: %s'), - six.text_type(ex)) - - def stop(self): - self._stop_rpc_server() - - # Notify dispatcher to stop all action threads it started. - LOG.info(_LI("Stopping dispatcher for engine %s"), self.engine_id) - self.dispatcher.stop() - - self.TG.stop() - super(EngineService, self).stop() - - def ticket_create(self, context, name): - values = {'name': name} - ticket = db_api.ticket_create(context, values) - return ticket diff --git a/evoque/rpc/__init__.py b/evoque/engine/ticket/__init__.py similarity index 100% rename from evoque/rpc/__init__.py rename to evoque/engine/ticket/__init__.py diff --git a/evoque/engine/ticket/api.py b/evoque/engine/ticket/api.py new file mode 100644 index 0000000..d265837 --- /dev/null +++ b/evoque/engine/ticket/api.py @@ -0,0 +1,22 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from evoque.common import rpc_service + + +class API(rpc_service.API): + def __init__(self, transport=None, context=None, topic=None): + super(API, self).__init__(transport, context, + topic="evoque-engine") + + def ticket_create(self, name): + return self._call('ticket_create', name=name) diff --git a/evoque/engine/ticket/endpoint.py b/evoque/engine/ticket/endpoint.py new file mode 100644 index 0000000..8e31386 --- /dev/null +++ b/evoque/engine/ticket/endpoint.py @@ -0,0 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from evoque.db import api as db_api + + +class Handler(object): + + def __init__(self): + super(Handler, self).__init__() + + def ticket_create(self, context, name): + values = {'name': name} + ticket = db_api.ticket_create(context, values) + return ticket diff --git a/evoque/opts.py b/evoque/opts.py index f75306a..473f097 100644 --- a/evoque/opts.py +++ b/evoque/opts.py @@ -10,9 +10,18 @@ # License for the specific language governing permissions and limitations # under the License. +import multiprocessing +import socket + from oslo_config import cfg +try: + default_workers = multiprocessing.cpu_count() or 1 +except NotImplementedError: + default_workers = 1 + + def list_opts(): return [ ("api", ( @@ -25,7 +34,7 @@ def list_opts(): cfg.BoolOpt('pecan_debug', default=False, help='Toggle Pecan Debug Middleware.'), - cfg.IntOpt('workers', min=1, + cfg.IntOpt('workers', default=default_workers, help='Number of workers for Evoque API server. ' 'By default the available number of CPU is used.'), cfg.IntOpt('max_limit', @@ -33,9 +42,9 @@ def list_opts(): help=('The maximum number of items returned in a ' 'single response from a collection resource')), )), - (None, ( + ("DEFAULT", ( cfg.StrOpt('host', - default='0.0.0.0', + default=socket.getfqdn(), help='The listen IP for the Evoque engine server.'), )), ] diff --git a/evoque/rpc/client.py b/evoque/rpc/client.py deleted file mode 100644 index a8e789b..0000000 --- a/evoque/rpc/client.py +++ /dev/null @@ -1,72 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -''' -Client side of the evoque engine RPC API. -''' - -from evoque.common import consts -from evoque.common import messaging - - -class EngineClient(object): - '''Client side of the evoque engine rpc API.''' - - BASE_RPC_API_VERSION = '1.0' - - def __init__(self): - self._client = messaging.get_rpc_client( - topic=consts.ENGINE_TOPIC, - server='0.0.0.0', - version=self.BASE_RPC_API_VERSION) - - @staticmethod - def make_msg(method, **kwargs): - return method, kwargs - - def call(self, ctxt, msg, version=None): - method, kwargs = msg - if version is not None: - client = self._client.prepare(version=version) - else: - client = self._client - return client.call(ctxt, method, **kwargs) - - def cast(self, ctxt, msg, version=None): - method, kwargs = msg - if version is not None: - client = self._client.prepare(version=version) - else: - client = self._client - return client.cast(ctxt, method, **kwargs) - - def local_error_name(self, error): - '''Returns the name of the error with any _Remote postfix removed. - - :param error: Remote raised error to derive the name from. - ''' - - error_name = error.__class__.__name__ - return error_name.split('_Remote')[0] - - def ignore_error_named(self, error, name): - '''Raises the error unless its local name matches the supplied name - - :param error: Remote raised error to derive the local name from. - :param name: Name to compare local name to. - ''' - if self.local_error_name(error) != name: - raise error - - def ticket_create(self, ctxt, name): - return self.call(ctxt, self.make_msg('ticket_create', - name=name)) diff --git a/evoque/service.py b/evoque/service.py index 72b6303..5d7e705 100644 --- a/evoque/service.py +++ b/evoque/service.py @@ -11,35 +11,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import multiprocessing +import sys from oslo_config import cfg -from oslo_log import log +from oslo_log import log as logging from evoque import opts -LOG = log.getLogger(__name__) +LOG = logging.getLogger(__name__) -def prepare_service(args=None): - conf = cfg.ConfigOpts() - log.register_options(conf) +def prepare_service(): # Register Evoque options for group, options in opts.list_opts(): - conf.register_opts(list(options), - group=None if group == "DEFAULT" else group) + cfg.CONF.register_opts( + list(options), + group=None if group == "DEFAULT" else group) - try: - default_workers = multiprocessing.cpu_count() or 1 - except NotImplementedError: - default_workers = 1 + logging.register_options(cfg.CONF) + cfg.CONF(sys.argv[1:], project='evoque') - conf.set_default("workers", default_workers, group="api") + logging.setup(cfg.CONF, 'evoque') - conf(args, project='evoque', validate_default_values=True) - log.setup(conf, 'evoque') - conf.log_opt_values(LOG, logging.DEBUG) - - return conf + return cfg.CONF diff --git a/requirements.txt b/requirements.txt index 526f81e..9f8bfbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ oslo.db>=3.0.0 # Apache-2.0 oslo.i18n>=1.5.0 # Apache-2.0 oslo.log>=1.8.0 # Apache-2.0 oslo.messaging!=1.17.0,!=1.17.1,!=2.6.0,!=2.6.1,>=1.16.0 # Apache-2.0 +oslo.serialization>=1.10.0 # Apache-2.0 pecan>=1.0.0 SQLAlchemy<1.1.0,>=0.9.9 sqlalchemy-migrate>=0.9.6