Refactored Watcher API service

This patchset introduces the use of oslo.service to run the
Watcher API service.

Change-Id: I6c38a3c1a2b4dc47388876e4c0ba61b7447690bd
Related-Bug: #1541850
This commit is contained in:
Vincent Françoise 2016-03-18 11:12:48 +01:00
parent 87087e9add
commit a9e7251d0d
7 changed files with 216 additions and 200 deletions

View File

@ -6,6 +6,7 @@ enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' #
jsonpatch>=1.1 # BSD
keystoneauth1>=2.1.0 # Apache-2.0
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
oslo.concurrency>=3.5.0 # Apache-2.0
oslo.config>=3.9.0 # Apache-2.0
oslo.context>=2.2.0 # Apache-2.0
oslo.db>=4.1.0 # Apache-2.0
@ -13,6 +14,7 @@ oslo.i18n>=2.1.0 # Apache-2.0
oslo.log>=1.14.0 # Apache-2.0
oslo.messaging>=4.5.0 # Apache-2.0
oslo.policy>=0.5.0 # Apache-2.0
oslo.reports>=0.6.0 # Apache-2.0
oslo.service>=1.0.0 # Apache-2.0
oslo.utils>=3.5.0 # Apache-2.0
PasteDeploy>=1.5.0 # MIT

View File

@ -19,6 +19,7 @@
from oslo_config import cfg
import pecan
from watcher._i18n import _
from watcher.api import acl
from watcher.api import config as api_config
from watcher.api import middleware
@ -27,16 +28,31 @@ from watcher.decision_engine.strategy.selection import default \
# Register options for the service
API_SERVICE_OPTS = [
cfg.IntOpt('port',
default=9322,
help='The port for the watcher API server'),
cfg.PortOpt('port',
default=9322,
help=_('The port for the watcher API server')),
cfg.StrOpt('host',
default='0.0.0.0',
help='The listen IP for the watcher API server'),
help=_('The listen IP for the watcher API server')),
cfg.IntOpt('max_limit',
default=1000,
help='The maximum number of items returned in a single '
'response from a collection resource.')
help=_('The maximum number of items returned in a single '
'response from a collection resource')),
cfg.IntOpt('workers',
min=1,
help=_('Number of workers for Watcher API service. '
'The default is equal to the number of CPUs available '
'if that can be determined, else a default worker '
'count of 1 is returned.')),
cfg.BoolOpt('enable_ssl_api',
default=False,
help=_("Enable the integrated stand-alone API to service "
"requests via HTTPS instead of HTTP. If there is a "
"front-end service performing HTTPS offloading from "
"the service, this option should be False; note, you "
"will want to change public API endpoint to represent "
"SSL termination URL with 'public_endpoint' option.")),
]
CONF = cfg.CONF
@ -68,3 +84,12 @@ def setup_app(config=None):
)
return acl.install(app, CONF, config.app.acl_public_routes)
class VersionSelectorApplication(object):
def __init__(self):
pc = get_pecan_config()
self.v1 = setup_app(config=pc)
def __call__(self, environ, start_response):
return self.v1(environ, start_response)

View File

@ -17,17 +17,15 @@
"""Starter script for the Watcher API service."""
import logging as std_logging
import os
import sys
from wsgiref import simple_server
from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from watcher._i18n import _
from watcher.api import app as api_app
from watcher.common import service
from watcher import version
LOG = logging.getLogger(__name__)
@ -37,15 +35,12 @@ CONF = cfg.CONF
def main():
service.prepare_service(sys.argv)
app = api_app.setup_app()
gmr.TextGuruMeditation.setup_autorun(version)
# Create the WSGI server and start it
host, port = cfg.CONF.api.host, cfg.CONF.api.port
srv = simple_server.make_server(host, port, app)
LOG.info(_('Starting server in PID %s') % os.getpid())
LOG.debug("Watcher configuration:")
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
# Build and start the WSGI app
server = service.WSGIService(
'watcher-api', CONF.api.enable_ssl_api)
if host == '0.0.0.0':
LOG.info(_('serving on 0.0.0.0:%(port)s, '
@ -55,4 +50,6 @@ def main():
LOG.info(_('serving on http://%(host)s:%(port)s') %
dict(host=host, port=port))
srv.serve_forever()
launcher = service.process_launcher()
launcher.launch_service(server, workers=server.workers)
launcher.wait()

View File

@ -15,108 +15,39 @@
# under the License.
import logging
import signal
import socket
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import _options
from oslo_log import log
import oslo_messaging as messaging
from oslo_reports import opts as gmr_opts
from oslo_service import service
from oslo_utils import importutils
from oslo_service import wsgi
from watcher._i18n import _LE
from watcher._i18n import _LI
from watcher._i18n import _
from watcher.api import app
from watcher.common import config
from watcher.common import context
from watcher.common import rpc
from watcher.objects import base as objects_base
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help='Seconds between running periodic tasks.'),
help=_('Seconds between running periodic tasks.')),
cfg.StrOpt('host',
default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.'),
help=_('Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')),
]
cfg.CONF.register_opts(service_opts)
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class RPCService(service.Service):
def __init__(self, host, manager_module, manager_class):
super(RPCService, self).__init__()
self.host = host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC)
self.topic = self.manager.topic
self.rpcserver = None
self.deregister = True
def start(self):
super(RPCService, self).start()
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.IronicObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.handle_signal()
self.manager.init_host()
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=cfg.CONF.periodic_interval,
context=admin_context)
LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def stop(self):
try:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception(_LE('Service error occurred when stopping the '
'RPC server. Error: %s'), e)
try:
self.manager.del_host(deregister=self.deregister)
except Exception as e:
LOG.exception(_LE('Service error occurred when cleaning up '
'the RPC manager. Error: %s'), e)
super(RPCService, self).stop(graceful=True)
LOG.info(_LI('Stopped RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def _handle_signal(self):
LOG.info(_LI('Got signal SIGUSR1. Not deregistering on next shutdown '
'of service %(service)s on host %(host)s.'),
{'service': self.topic, 'host': self.host})
self.deregister = False
def handle_signal(self):
"""Add a signal handler for SIGUSR1.
The handler ensures that the manager is not deregistered when it is
shutdown.
"""
signal.signal(signal.SIGUSR1, self._handle_signal)
_DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO',
'oslo.messaging=INFO', 'sqlalchemy=WARN',
'keystoneclient=INFO', 'stevedore=INFO',
@ -125,8 +56,50 @@ _DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO',
'glanceclient=WARN', 'watcher.openstack.common=WARN']
def prepare_service(argv=[], conf=cfg.CONF):
class WSGIService(service.ServiceBase):
"""Provides ability to launch Watcher API from wsgi app."""
def __init__(self, name, use_ssl=False):
"""Initialize, but do not start the WSGI server.
:param name: The name of the WSGI server given to the loader.
:param use_ssl: Wraps the socket in an SSL context if True.
"""
self.name = name
self.app = app.VersionSelectorApplication()
self.workers = (CONF.api.workers or
processutils.get_worker_count())
self.server = wsgi.Server(CONF, name, self.app,
host=CONF.api.host,
port=CONF.api.port,
use_ssl=use_ssl,
logger_name=name)
def start(self):
"""Start serving this service using loaded configuration"""
self.server.start()
def stop(self):
"""Stop serving this API"""
self.server.stop()
def wait(self):
"""Wait for the service to stop serving this API"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default"""
self.server.reset()
def process_launcher(conf=cfg.CONF):
return service.ProcessLauncher(conf)
def prepare_service(argv=(), conf=cfg.CONF):
log.register_options(conf)
gmr_opts.set_defaults(conf)
config.parse_args(argv)
cfg.set_defaults(_options.log_opts,
default_log_levels=_DEFAULT_LOG_LEVELS)

View File

@ -7,16 +7,46 @@
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: python-watcher 0.25.1.dev3\n"
"Project-Id-Version: python-watcher 0.26.1.dev13\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2016-03-30 10:10+0200\n"
"POT-Creation-Date: 2016-04-22 10:31+0200\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.2.0\n"
"Generated-By: Babel 2.3.2\n"
#: watcher/api/app.py:33
msgid "The port for the watcher API server"
msgstr ""
#: watcher/api/app.py:36
msgid "The listen IP for the watcher API server"
msgstr ""
#: watcher/api/app.py:39
msgid ""
"The maximum number of items returned in a single response from a "
"collection resource"
msgstr ""
#: watcher/api/app.py:43
msgid ""
"Number of workers for Watcher API service. The default is equal to the "
"number of CPUs available if that can be determined, else a default worker"
" count of 1 is returned."
msgstr ""
#: watcher/api/app.py:50
msgid ""
"Enable the integrated stand-alone API to service requests via HTTPS "
"instead of HTTP. If there is a front-end service performing HTTPS "
"offloading from the service, this option should be False; note, you will "
"want to change public API endpoint to represent SSL termination URL with "
"'public_endpoint' option."
msgstr ""
#: watcher/api/controllers/v1/action.py:364
msgid "Cannot create an action directly"
@ -30,41 +60,41 @@ msgstr ""
msgid "Cannot delete an action directly"
msgstr ""
#: watcher/api/controllers/v1/action_plan.py:102
#: watcher/api/controllers/v1/action_plan.py:87
#, python-format
msgid "Invalid state: %(state)s"
msgstr ""
#: watcher/api/controllers/v1/action_plan.py:422
#: watcher/api/controllers/v1/action_plan.py:407
#, python-format
msgid "State transition not allowed: (%(initial_state)s -> %(new_state)s)"
msgstr ""
#: watcher/api/controllers/v1/audit.py:359
#: watcher/api/controllers/v1/audit.py:365
msgid "The audit template UUID or name specified is invalid"
msgstr ""
#: watcher/api/controllers/v1/types.py:148
#: watcher/api/controllers/v1/types.py:123
#, python-format
msgid "%s is not JSON serializable"
msgstr ""
#: watcher/api/controllers/v1/types.py:184
#: watcher/api/controllers/v1/types.py:159
#, python-format
msgid "Wrong type. Expected '%(type)s', got '%(value)s'"
msgstr ""
#: watcher/api/controllers/v1/types.py:223
#: watcher/api/controllers/v1/types.py:198
#, python-format
msgid "'%s' is an internal attribute and can not be updated"
msgstr ""
#: watcher/api/controllers/v1/types.py:227
#: watcher/api/controllers/v1/types.py:202
#, python-format
msgid "'%s' is a mandatory attribute and can not be removed"
msgstr ""
#: watcher/api/controllers/v1/types.py:232
#: watcher/api/controllers/v1/types.py:207
msgid "'add' and 'replace' operations needs value"
msgstr ""
@ -135,22 +165,21 @@ msgstr ""
msgid "Oops! We need disaster recover plan"
msgstr ""
#: watcher/cmd/api.py:46 watcher/cmd/applier.py:39
#: watcher/cmd/decisionengine.py:40
#, python-format
msgid "Starting server in PID %s"
msgstr ""
#: watcher/cmd/api.py:51
#: watcher/cmd/api.py:46
#, python-format
msgid "serving on 0.0.0.0:%(port)s, view at http://127.0.0.1:%(port)s"
msgstr ""
#: watcher/cmd/api.py:55
#: watcher/cmd/api.py:50
#, python-format
msgid "serving on http://%(host)s:%(port)s"
msgstr ""
#: watcher/cmd/applier.py:39 watcher/cmd/decisionengine.py:40
#, python-format
msgid "Starting server in PID %s"
msgstr ""
#: watcher/common/clients.py:29
msgid "Version of Nova API to use in novaclient."
msgstr ""
@ -252,149 +281,139 @@ msgstr ""
#: watcher/common/exception.py:184
#, python-format
msgid "Audit %(audit)s could not be found"
msgid "Audit type %(audit_type)s could not be found"
msgstr ""
#: watcher/common/exception.py:188
#, python-format
msgid "An audit with UUID %(uuid)s already exists"
msgid "Audit %(audit)s could not be found"
msgstr ""
#: watcher/common/exception.py:192
#, python-format
msgid "Audit %(audit)s is referenced by one or multiple action plans"
msgid "An audit with UUID %(uuid)s already exists"
msgstr ""
#: watcher/common/exception.py:197
#: watcher/common/exception.py:196
#, python-format
msgid "ActionPlan %(action_plan)s could not be found"
msgid "Audit %(audit)s is referenced by one or multiple action plans"
msgstr ""
#: watcher/common/exception.py:201
#, python-format
msgid "An action plan with UUID %(uuid)s already exists"
msgid "ActionPlan %(action_plan)s could not be found"
msgstr ""
#: watcher/common/exception.py:205
#, python-format
msgid "Action Plan %(action_plan)s is referenced by one or multiple actions"
msgid "An action plan with UUID %(uuid)s already exists"
msgstr ""
#: watcher/common/exception.py:210
#: watcher/common/exception.py:209
#, python-format
msgid "Action %(action)s could not be found"
msgid "Action Plan %(action_plan)s is referenced by one or multiple actions"
msgstr ""
#: watcher/common/exception.py:214
#, python-format
msgid "An action with UUID %(uuid)s already exists"
msgid "Action %(action)s could not be found"
msgstr ""
#: watcher/common/exception.py:218
#, python-format
msgid "An action with UUID %(uuid)s already exists"
msgstr ""
#: watcher/common/exception.py:222
#, python-format
msgid "Action plan %(action_plan)s is referenced by one or multiple goals"
msgstr ""
#: watcher/common/exception.py:223
#: watcher/common/exception.py:227
msgid "Filtering actions on both audit and action-plan is prohibited"
msgstr ""
#: watcher/common/exception.py:232
#: watcher/common/exception.py:236
#, python-format
msgid "Couldn't apply patch '%(patch)s'. Reason: %(reason)s"
msgstr ""
#: watcher/common/exception.py:238
#: watcher/common/exception.py:242
#, python-format
msgid "Workflow execution error: %(error)s"
msgstr ""
#: watcher/common/exception.py:242
#: watcher/common/exception.py:246
msgid "Illegal argument"
msgstr ""
#: watcher/common/exception.py:246
#: watcher/common/exception.py:250
msgid "No such metric"
msgstr ""
#: watcher/common/exception.py:250
#: watcher/common/exception.py:254
msgid "No rows were returned"
msgstr ""
#: watcher/common/exception.py:254
#: watcher/common/exception.py:258
#, python-format
msgid "%(client)s connection failed. Reason: %(reason)s"
msgstr ""
#: watcher/common/exception.py:258
#: watcher/common/exception.py:262
msgid "'Keystone API endpoint is missing''"
msgstr ""
#: watcher/common/exception.py:262
#: watcher/common/exception.py:266
msgid "The list of hypervisor(s) in the cluster is empty"
msgstr ""
#: watcher/common/exception.py:266
#: watcher/common/exception.py:270
msgid "The metrics resource collector is not defined"
msgstr ""
#: watcher/common/exception.py:270
#: watcher/common/exception.py:274
msgid "the cluster state is not defined"
msgstr ""
#: watcher/common/exception.py:276
#: watcher/common/exception.py:280
#, python-format
msgid "The instance '%(name)s' is not found"
msgstr ""
#: watcher/common/exception.py:280
msgid "The hypervisor is not found"
msgstr ""
#: watcher/common/exception.py:284
#, python-format
msgid "Error loading plugin '%(name)s'"
msgid "The hypervisor is not found"
msgstr ""
#: watcher/common/exception.py:288
#, python-format
msgid "The identifier '%(name)s' is a reserved word"
msgid "Error loading plugin '%(name)s'"
msgstr ""
#: watcher/common/exception.py:292
#, python-format
msgid "The %(name)s resource %(id)s is not soft deleted"
msgid "The identifier '%(name)s' is a reserved word"
msgstr ""
#: watcher/common/exception.py:296
#, python-format
msgid "The %(name)s resource %(id)s is not soft deleted"
msgstr ""
#: watcher/common/exception.py:300
msgid "Limit should be positive"
msgstr ""
#: watcher/common/service.py:83
#, python-format
msgid "Created RPC server for service %(service)s on host %(host)s."
#: watcher/common/service.py:36
msgid "Seconds between running periodic tasks."
msgstr ""
#: watcher/common/service.py:92
#, python-format
msgid "Service error occurred when stopping the RPC server. Error: %s"
msgstr ""
#: watcher/common/service.py:97
#, python-format
msgid "Service error occurred when cleaning up the RPC manager. Error: %s"
msgstr ""
#: watcher/common/service.py:101
#, python-format
msgid "Stopped RPC server for service %(service)s on host %(host)s."
msgstr ""
#: watcher/common/service.py:106
#, python-format
#: watcher/common/service.py:39
msgid ""
"Got signal SIGUSR1. Not deregistering on next shutdown of service "
"%(service)s on host %(host)s."
"Name of this node. This can be an opaque identifier. It is not "
"necessarily a hostname, FQDN, or IP address. However, the node name must "
"be valid within an AMQP key, and if using ZeroMQ, a valid hostname, FQDN,"
" or IP address."
msgstr ""
#: watcher/common/utils.py:53
@ -489,25 +508,25 @@ msgstr ""
msgid "Purge process completed"
msgstr ""
#: watcher/db/sqlalchemy/api.py:362
#: watcher/db/sqlalchemy/api.py:361
msgid ""
"Multiple audit templates exist with the same name. Please use the audit "
"template uuid instead"
msgstr ""
#: watcher/db/sqlalchemy/api.py:384
#: watcher/db/sqlalchemy/api.py:383
msgid "Cannot overwrite UUID for an existing Audit Template."
msgstr ""
#: watcher/db/sqlalchemy/api.py:495
#: watcher/db/sqlalchemy/api.py:494
msgid "Cannot overwrite UUID for an existing Audit."
msgstr ""
#: watcher/db/sqlalchemy/api.py:588
#: watcher/db/sqlalchemy/api.py:587
msgid "Cannot overwrite UUID for an existing Action."
msgstr ""
#: watcher/db/sqlalchemy/api.py:699
#: watcher/db/sqlalchemy/api.py:698
msgid "Cannot overwrite UUID for an existing Action Plan."
msgstr ""
@ -517,12 +536,12 @@ msgid ""
"instead"
msgstr ""
#: watcher/decision_engine/model/model_root.py:37
#: watcher/decision_engine/model/model_root.py:42
#: watcher/decision_engine/model/model_root.py:33
#: watcher/decision_engine/model/model_root.py:38
msgid "'obj' argument type is not valid"
msgstr ""
#: watcher/decision_engine/planner/default.py:79
#: watcher/decision_engine/planner/default.py:78
msgid "The action plan is empty"
msgstr ""
@ -563,22 +582,22 @@ msgstr ""
msgid "No proper target host could be found"
msgstr ""
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:104
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:105
#, python-format
msgid "Unexpexted resource state type, state=%(state)s, state_type=%(st)s."
msgstr ""
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:156
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:157
#, python-format
msgid "Cannot live migrate: vm_uuid=%(vm_uuid)s, state=%(vm_state)s."
msgstr ""
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:240
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:241
#, python-format
msgid "No values returned by %(resource_id)s for memory.usage or disk.root.size"
msgstr ""
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:489
#: watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py:490
msgid "Executing Smart Strategy"
msgstr ""

View File

@ -16,22 +16,22 @@
from __future__ import absolute_import
from __future__ import unicode_literals
from six.moves.socketserver import BaseServer
import types
from wsgiref import simple_server
from mock import Mock
from mock import patch
import mock
from oslo_config import cfg
from oslo_service import wsgi
from pecan.testing import load_test_app
from watcher.api import config as api_config
from watcher.cmd import api
from watcher.tests.base import BaseTestCase
from watcher.common import service
from watcher.tests import base
class TestApi(BaseTestCase):
class TestApi(base.BaseTestCase):
def setUp(self):
super(TestApi, self).setUp()
@ -48,19 +48,18 @@ class TestApi(BaseTestCase):
super(TestApi, self).tearDown()
self.conf._parse_cli_opts = self._parse_cli_opts
@patch("watcher.api.app.pecan.make_app")
@patch.object(BaseServer, "serve_forever", Mock())
@patch.object(simple_server, "make_server")
def test_run_api_app(self, m_make, m_make_app):
@mock.patch.object(wsgi, "Server", mock.Mock())
@mock.patch("watcher.api.app.pecan.make_app")
@mock.patch.object(service, "process_launcher")
def test_run_api_app(self, m_launcher, m_make_app):
m_make_app.return_value = load_test_app(config=api_config.PECAN_CONFIG)
api.main()
self.assertEqual(1, m_make.call_count)
self.assertEqual(1, m_launcher.call_count)
@patch("watcher.api.app.pecan.make_app")
@patch.object(BaseServer, "serve_forever", Mock())
@patch.object(simple_server, "make_server")
def test_run_api_app_serve_specific_address(self, m_make, m_make_app):
@mock.patch("watcher.api.app.pecan.make_app")
@mock.patch.object(service, "process_launcher")
def test_run_api_app_serve_specific_address(self, m_launcher, m_make_app):
cfg.CONF.set_default("host", "localhost", group="api")
m_make_app.return_value = load_test_app(config=api_config.PECAN_CONFIG)
api.main()
self.assertEqual(1, m_make.call_count)
self.assertEqual(1, m_launcher.call_count)

View File

@ -17,3 +17,4 @@
import pbr.version
version_info = pbr.version.VersionInfo('python-watcher')
version_string = version_info.version_string