diff --git a/openstack-common.conf b/openstack-common.conf index 5b93450004..fdb9acf0b6 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -2,16 +2,11 @@ # The list of modules to copy from oslo-incubator module=context -module=eventlet_backdoor module=local module=log -module=loopingcall module=notifier module=pastedeploy -module=periodic_task module=rpc -module=service -module=threadgroup module=versionutils # The base module to hold the copy of openstack.common diff --git a/requirements.txt b/requirements.txt index fa89f962a1..baf41833cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,6 +31,7 @@ pexpect!=3.3,>=3.1 # ISC License oslo.config>=1.11.0 # Apache-2.0 oslo.i18n>=1.5.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +oslo.service>=0.1.0 # Apache-2.0 oslo.utils>=1.9.0 # Apache-2.0 oslo.concurrency>=2.1.0 # Apache-2.0 MySQL-python;python_version=='2.7' diff --git a/trove/cmd/conductor.py b/trove/cmd/conductor.py index c398e860aa..62fd76894b 100755 --- a/trove/cmd/conductor.py +++ b/trove/cmd/conductor.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. from oslo_concurrency import processutils +from oslo_service import service as openstack_service + from trove.cmd.common import with_initialize @@ -20,12 +22,11 @@ from trove.cmd.common import with_initialize def main(conf): from trove.common.rpc import service as rpc_service from trove.common.rpc import version as rpc_version - from trove.openstack.common import service as openstack_service topic = conf.conductor_queue server = rpc_service.RpcService( manager=conf.conductor_manager, topic=topic, rpc_api_version=rpc_version.RPC_API_VERSION) workers = conf.trove_conductor_workers or processutils.get_worker_count() - launcher = openstack_service.launch(server, workers=workers) + launcher = openstack_service.launch(conf, server, workers=workers) launcher.wait() diff --git a/trove/cmd/guest.py b/trove/cmd/guest.py index efa8f91fc3..b401daf697 100755 --- a/trove/cmd/guest.py +++ b/trove/cmd/guest.py @@ -25,11 +25,11 @@ gettext.install('trove', unicode=1) import sys from oslo_config import cfg as openstack_cfg +from oslo_service import service as openstack_service from trove.common import cfg from trove.common import debug_utils from trove.openstack.common import log as logging -from trove.openstack.common import service as openstack_service CONF = cfg.CONF # The guest_id opt definition must match the one in common/cfg.py @@ -66,5 +66,5 @@ def main(): manager=manager, host=CONF.guest_id, rpc_api_version=rpc_version.RPC_API_VERSION) - launcher = openstack_service.launch(server) + launcher = openstack_service.launch(CONF, server) launcher.wait() diff --git a/trove/cmd/taskmanager.py b/trove/cmd/taskmanager.py index 9413307e44..3aa4ef97d3 100755 --- a/trove/cmd/taskmanager.py +++ b/trove/cmd/taskmanager.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. from oslo_config import cfg as openstack_cfg +from oslo_service import service as openstack_service + from trove.cmd.common import with_initialize @@ -22,12 +24,11 @@ extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')] def startup(conf, topic): from trove.common.rpc import service as rpc_service from trove.common.rpc import version as rpc_version - from trove.openstack.common import service as openstack_service server = rpc_service.RpcService( manager=conf.taskmanager_manager, topic=topic, rpc_api_version=rpc_version.RPC_API_VERSION) - launcher = openstack_service.launch(server) + launcher = openstack_service.launch(conf, server) launcher.wait() diff --git a/trove/common/base_wsgi.py b/trove/common/base_wsgi.py index 997b543971..a3e6a5edde 100644 --- a/trove/common/base_wsgi.py +++ b/trove/common/base_wsgi.py @@ -28,6 +28,8 @@ import time import eventlet.wsgi from oslo_config import cfg +from oslo_service import service +from oslo_service import sslutils import routes import routes.middleware import webob.dec @@ -39,8 +41,6 @@ from trove.openstack.common import exception from trove.openstack.common.gettextutils import _ from trove.openstack.common import jsonutils from trove.openstack.common import log as logging -from trove.openstack.common import service -from trove.openstack.common import sslutils from trove.openstack.common import xmlutils socket_opts = [ @@ -70,7 +70,7 @@ class Service(service.Service): Provides a Service API for wsgi servers. This gives us the ability to launch wsgi servers with the - Launcher classes in service.py. + Launcher classes in oslo_service.service.py. """ def __init__(self, application, port, @@ -100,8 +100,8 @@ class Service(service.Service): sock = eventlet.listen(bind_addr, backlog=backlog, family=family) - if sslutils.is_enabled(): - sock = sslutils.wrap(sock) + if sslutils.is_enabled(CONF): + sock = sslutils.wrap(CONF, sock) except socket.error as err: if err.args[0] != errno.EADDRINUSE: diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 31ba19efa4..67a12d18f0 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -76,7 +76,7 @@ common_opts = [ help='Trove authentication URL.'), cfg.StrOpt('host', default='0.0.0.0', help='Host to listen for RPC messages.'), - cfg.IntOpt('report_interval', default=10, + cfg.IntOpt('report_interval', default=30, help='The interval (in seconds) which periodic tasks are run.'), cfg.BoolOpt('trove_dns_support', default=False, help='Whether Trove should add DNS entries on create ' @@ -307,9 +307,8 @@ common_opts = [ help='Client to send Swift calls to.'), cfg.StrOpt('exists_notification_transformer', help='Transformer for exists notifications.'), - cfg.IntOpt('exists_notification_ticks', default=360, - help='Number of report_intervals to wait between pushing ' - 'events (see report_interval).'), + cfg.IntOpt('exists_notification_interval', default=3600, + help='Seconds to wait between pushing events.'), cfg.DictOpt('notification_service_id', default={'mysql': '2f3ff068-2bfb-4f70-9a9d-a6bb65bc084b', 'percona': 'fd1723f5-68d2-409c-994f-a4a197892a17', diff --git a/trove/common/rpc/service.py b/trove/common/rpc/service.py index 2708744f4f..fbf9809bb6 100644 --- a/trove/common/rpc/service.py +++ b/trove/common/rpc/service.py @@ -18,16 +18,17 @@ import inspect import os + import oslo_messaging as messaging +from oslo_service import loopingcall +from oslo_service import service from oslo_utils import importutils from osprofiler import profiler -from trove.openstack.common.gettextutils import _ -from trove.openstack.common import log as logging -from trove.openstack.common import loopingcall -from trove.openstack.common import service from trove.common import cfg from trove.common import profile +from trove.openstack.common.gettextutils import _ +from trove.openstack.common import log as logging from trove import rpc @@ -45,7 +46,6 @@ class RpcService(service.Service): self.topic = topic or self.binary.rpartition('trove-')[2] _manager = importutils.import_object(manager) self.manager_impl = profiler.trace_cls("rpc")(_manager) - self.report_interval = CONF.report_interval self.rpc_api_version = rpc_api_version or \ self.manager_impl.RPC_API_VERSION profile.setup_profiler(self.binary, self.host) @@ -64,11 +64,12 @@ class RpcService(service.Service): self.rpcserver.start() # TODO(hub-cap): Currently the context is none... do we _need_ it here? - if self.report_interval > 0: + report_interval = CONF.report_interval + if report_interval > 0: pulse = loopingcall.FixedIntervalLoopingCall( self.manager_impl.run_periodic_tasks, context=None) - pulse.start(interval=self.report_interval, - initial_delay=self.report_interval) + pulse.start(interval=report_interval, + initial_delay=report_interval) pulse.wait() def stop(self): diff --git a/trove/common/utils.py b/trove/common/utils.py index 7741109420..285f4447b5 100644 --- a/trove/common/utils.py +++ b/trove/common/utils.py @@ -26,6 +26,7 @@ import uuid from eventlet.timeout import Timeout import jinja2 from oslo_concurrency import processutils +from oslo_service import loopingcall from oslo_utils import importutils from oslo_utils import strutils from oslo_utils import timeutils @@ -36,7 +37,6 @@ from trove.common import cfg from trove.common import exception from trove.common.i18n import _ from trove.openstack.common import log as logging -from trove.openstack.common import loopingcall CONF = cfg.CONF diff --git a/trove/common/wsgi.py b/trove/common/wsgi.py index 312893d997..c97ff082e6 100644 --- a/trove/common/wsgi.py +++ b/trove/common/wsgi.py @@ -23,6 +23,7 @@ import uuid import eventlet.wsgi import jsonschema from oslo_serialization import jsonutils +from oslo_service import service import paste.urlmap import webob import webob.dec @@ -36,7 +37,6 @@ from trove.common.i18n import _ from trove.common import utils from trove.openstack.common import log as logging from trove.openstack.common import pastedeploy -from trove.openstack.common import service CONTEXT_KEY = 'trove.context' Router = base_wsgi.Router @@ -81,7 +81,7 @@ def launch(app_name, port, paste_config_file, data={}, app = pastedeploy.paste_deploy_app(paste_config_file, app_name, data) server = base_wsgi.Service(app, port, host=host, backlog=backlog, threads=threads) - return service.launch(server, workers) + return service.launch(CONF, server, workers) # Note: taken from Nova diff --git a/trove/conductor/manager.py b/trove/conductor/manager.py index f50cc868ee..01e5367ba3 100644 --- a/trove/conductor/manager.py +++ b/trove/conductor/manager.py @@ -13,6 +13,7 @@ # under the License. import oslo_messaging as messaging +from oslo_service import periodic_task from trove.backup import models as bkup_models from trove.common import cfg @@ -24,7 +25,6 @@ from trove.conductor.models import LastSeen from trove.extensions.mysql import models as mysql_models from trove.instance import models as t_models from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -35,7 +35,7 @@ class Manager(periodic_task.PeriodicTasks): target = messaging.Target(version=rpc_version.RPC_API_VERSION) def __init__(self): - super(Manager, self).__init__() + super(Manager, self).__init__(CONF) def _message_too_old(self, instance_id, method_name, sent): fields = { diff --git a/trove/extensions/mgmt/instances/models.py b/trove/extensions/mgmt/instances/models.py index e1fbc424ad..87d4d5fef6 100644 --- a/trove/extensions/mgmt/instances/models.py +++ b/trove/extensions/mgmt/instances/models.py @@ -190,7 +190,7 @@ class NotificationTransformer(object): audit_start = utils.isotime(now, subsecond=True) audit_end = utils.isotime( now + datetime.timedelta( - seconds=CONF.exists_notification_ticks * CONF.report_interval), + seconds=CONF.exists_notification_interval), subsecond=True) return audit_start, audit_end diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py index c918605768..aa74b1e264 100644 --- a/trove/guestagent/datastore/experimental/cassandra/manager.py +++ b/trove/guestagent/datastore/experimental/cassandra/manager.py @@ -16,6 +16,8 @@ import os +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -23,7 +25,6 @@ from trove.guestagent.datastore.experimental.cassandra import service from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -35,8 +36,9 @@ class Manager(periodic_task.PeriodicTasks): def __init__(self): self.appStatus = service.CassandraAppStatus() self.app = service.CassandraApp(self.appStatus) + super(Manager, self).__init__(CONF) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """Update the status of the Cassandra service.""" self.appStatus.update() diff --git a/trove/guestagent/datastore/experimental/couchbase/manager.py b/trove/guestagent/datastore/experimental/couchbase/manager.py index 5c19565805..3789fe87d0 100644 --- a/trove/guestagent/datastore/experimental/couchbase/manager.py +++ b/trove/guestagent/datastore/experimental/couchbase/manager.py @@ -15,6 +15,8 @@ import os +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -25,7 +27,6 @@ from trove.guestagent.datastore.experimental.couchbase import system from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -41,8 +42,9 @@ class Manager(periodic_task.PeriodicTasks): def __init__(self): self.appStatus = service.CouchbaseAppStatus() self.app = service.CouchbaseApp(self.appStatus) + super(Manager, self).__init__(CONF) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """ Updates the couchbase trove instance. It is decorated with diff --git a/trove/guestagent/datastore/experimental/couchdb/manager.py b/trove/guestagent/datastore/experimental/couchdb/manager.py index d1d7fe690c..b2a5bdf9e7 100644 --- a/trove/guestagent/datastore/experimental/couchdb/manager.py +++ b/trove/guestagent/datastore/experimental/couchdb/manager.py @@ -15,6 +15,8 @@ import os +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -22,7 +24,6 @@ from trove.guestagent.datastore.experimental.couchdb import service from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -38,6 +39,7 @@ class Manager(periodic_task.PeriodicTasks): def __init__(self): self.appStatus = service.CouchDBAppStatus() self.app = service.CouchDBApp(self.appStatus) + super(Manager, self).__init__(CONF) def rpc_ping(self, context): LOG.debug("Responding to RPC ping.") @@ -70,7 +72,7 @@ class Manager(periodic_task.PeriodicTasks): self.app.complete_install_or_restart() LOG.info(_('Completed setup of CouchDB database instance.')) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """Update the status of the CouchDB service.""" self.appStatus.update() diff --git a/trove/guestagent/datastore/experimental/db2/manager.py b/trove/guestagent/datastore/experimental/db2/manager.py index e85eca60a5..6c431dd916 100644 --- a/trove/guestagent/datastore/experimental/db2/manager.py +++ b/trove/guestagent/datastore/experimental/db2/manager.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.guestagent.datastore.experimental.db2 import service @@ -20,7 +22,6 @@ from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common.gettextutils import _ from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -36,8 +37,9 @@ class Manager(periodic_task.PeriodicTasks): self.appStatus = service.DB2AppStatus() self.app = service.DB2App(self.appStatus) self.admin = service.DB2Admin() + super(Manager, self).__init__(CONF) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """ Updates the status of DB2 Trove instance. It is decorated diff --git a/trove/guestagent/datastore/experimental/mongodb/manager.py b/trove/guestagent/datastore/experimental/mongodb/manager.py index 57c5ea1918..a7e4663b0a 100644 --- a/trove/guestagent/datastore/experimental/mongodb/manager.py +++ b/trove/guestagent/datastore/experimental/mongodb/manager.py @@ -15,6 +15,7 @@ import os +from oslo_service import periodic_task from oslo_utils import netutils from trove.common import cfg @@ -28,7 +29,6 @@ from trove.guestagent.datastore.experimental.mongodb import system from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -41,8 +41,9 @@ class Manager(periodic_task.PeriodicTasks): def __init__(self): self.status = service.MongoDBAppStatus() self.app = service.MongoDBApp(self.status) + super(Manager, self).__init__(CONF) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """Update the status of the MongoDB service.""" self.status.update() diff --git a/trove/guestagent/datastore/experimental/postgresql/manager.py b/trove/guestagent/datastore/experimental/postgresql/manager.py index 0a930e19b3..d6a19db83c 100644 --- a/trove/guestagent/datastore/experimental/postgresql/manager.py +++ b/trove/guestagent/datastore/experimental/postgresql/manager.py @@ -16,6 +16,9 @@ import os +from oslo_config import cfg as os_cfg +from oslo_service import periodic_task + from .service.config import PgSqlConfig from .service.database import PgSqlDatabase from .service.install import PgSqlInstall @@ -27,7 +30,6 @@ from trove.guestagent import backup from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -44,9 +46,16 @@ class Manager( ): def __init__(self, *args, **kwargs): - super(Manager, self).__init__(*args, **kwargs) + if len(args) and isinstance(args[0], os_cfg.ConfigOpts): + conf = args[0] + elif 'conf' in kwargs: + conf = kwargs['conf'] + else: + conf = CONF - @periodic_task.periodic_task(ticks_between_runs=3) + super(Manager, self).__init__(conf) + + @periodic_task.periodic_task def update_status(self, context): PgSqlAppStatus.get().update() diff --git a/trove/guestagent/datastore/experimental/redis/manager.py b/trove/guestagent/datastore/experimental/redis/manager.py index 7d68100d0a..3b15055cdb 100644 --- a/trove/guestagent/datastore/experimental/redis/manager.py +++ b/trove/guestagent/datastore/experimental/redis/manager.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -25,7 +27,6 @@ from trove.guestagent.datastore.experimental.redis.service import ( from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -39,7 +40,10 @@ class Manager(periodic_task.PeriodicTasks): based off of the service_type of the trove instance """ - @periodic_task.periodic_task(ticks_between_runs=3) + def __init__(self): + super(Manager, self).__init__(CONF) + + @periodic_task.periodic_task def update_status(self, context): """ Updates the redis trove instance. It is decorated with diff --git a/trove/guestagent/datastore/experimental/vertica/manager.py b/trove/guestagent/datastore/experimental/vertica/manager.py index e9dee34565..68b251e4ed 100644 --- a/trove/guestagent/datastore/experimental/vertica/manager.py +++ b/trove/guestagent/datastore/experimental/vertica/manager.py @@ -13,6 +13,8 @@ import os +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -23,7 +25,6 @@ from trove.guestagent.datastore.experimental.vertica.service import VerticaApp from trove.guestagent import dbaas from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -35,8 +36,9 @@ class Manager(periodic_task.PeriodicTasks): def __init__(self): self.appStatus = VerticaAppStatus() self.app = VerticaApp(self.appStatus) + super(Manager, self).__init__(CONF) - @periodic_task.periodic_task(ticks_between_runs=3) + @periodic_task.periodic_task def update_status(self, context): """Update the status of the Vertica service.""" self.appStatus.update() diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index cfe4b4a1e7..35d1b962e9 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -18,6 +18,8 @@ import os +from oslo_service import periodic_task + from trove.common import cfg from trove.common import exception from trove.common.i18n import _ @@ -31,7 +33,6 @@ from trove.guestagent import dbaas from trove.guestagent.strategies.replication import get_replication_strategy from trove.guestagent import volume from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -44,7 +45,11 @@ REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, class Manager(periodic_task.PeriodicTasks): - @periodic_task.periodic_task(ticks_between_runs=3) + + def __init__(self): + super(Manager, self).__init__(CONF) + + @periodic_task.periodic_task def update_status(self, context): """Update the status of the MySQL service.""" MySqlAppStatus.get().update() diff --git a/trove/openstack/common/eventlet_backdoor.py b/trove/openstack/common/eventlet_backdoor.py deleted file mode 100644 index 49dfd95e5b..0000000000 --- a/trove/openstack/common/eventlet_backdoor.py +++ /dev/null @@ -1,152 +0,0 @@ -# Copyright (c) 2012 OpenStack Foundation. -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from __future__ import print_function - -import copy -import errno -import gc -import os -import pprint -import socket -import sys -import traceback - -import eventlet -import eventlet.backdoor -import greenlet -from oslo_config import cfg - -from trove.openstack.common._i18n import _LI -from trove.openstack.common import log as logging - -help_for_backdoor_port = ( - "Acceptable values are 0, , and :, where 0 results " - "in listening on a random tcp port number; results in listening " - "on the specified port number (and not enabling backdoor if that port " - "is in use); and : results in listening on the smallest " - "unused port number within the specified range of port numbers. The " - "chosen port is displayed in the service's log file.") -eventlet_backdoor_opts = [ - cfg.StrOpt('backdoor_port', - help="Enable eventlet backdoor. %s" % help_for_backdoor_port) -] - -CONF = cfg.CONF -CONF.register_opts(eventlet_backdoor_opts) -LOG = logging.getLogger(__name__) - - -def list_opts(): - """Entry point for oslo.config-generator. - """ - return [(None, copy.deepcopy(eventlet_backdoor_opts))] - - -class EventletBackdoorConfigValueError(Exception): - def __init__(self, port_range, help_msg, ex): - msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' - '%(help)s' % - {'range': port_range, 'ex': ex, 'help': help_msg}) - super(EventletBackdoorConfigValueError, self).__init__(msg) - self.port_range = port_range - - -def _dont_use_this(): - print("Don't use this, just disconnect instead") - - -def _find_objects(t): - return [o for o in gc.get_objects() if isinstance(o, t)] - - -def _print_greenthreads(): - for i, gt in enumerate(_find_objects(greenlet.greenlet)): - print(i, gt) - traceback.print_stack(gt.gr_frame) - print() - - -def _print_nativethreads(): - for threadId, stack in sys._current_frames().items(): - print(threadId) - traceback.print_stack(stack) - print() - - -def _parse_port_range(port_range): - if ':' not in port_range: - start, end = port_range, port_range - else: - start, end = port_range.split(':', 1) - try: - start, end = int(start), int(end) - if end < start: - raise ValueError - return start, end - except ValueError as ex: - raise EventletBackdoorConfigValueError(port_range, ex, - help_for_backdoor_port) - - -def _listen(host, start_port, end_port, listen_func): - try_port = start_port - while True: - try: - return listen_func((host, try_port)) - except socket.error as exc: - if (exc.errno != errno.EADDRINUSE or - try_port >= end_port): - raise - try_port += 1 - - -def initialize_if_enabled(): - backdoor_locals = { - 'exit': _dont_use_this, # So we don't exit the entire process - 'quit': _dont_use_this, # So we don't exit the entire process - 'fo': _find_objects, - 'pgt': _print_greenthreads, - 'pnt': _print_nativethreads, - } - - if CONF.backdoor_port is None: - return None - - start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) - - # NOTE(johannes): The standard sys.displayhook will print the value of - # the last expression and set it to __builtin__._, which overwrites - # the __builtin__._ that gettext sets. Let's switch to using pprint - # since it won't interact poorly with gettext, and it's easier to - # read the output too. - def displayhook(val): - if val is not None: - pprint.pprint(val) - sys.displayhook = displayhook - - sock = _listen('localhost', start_port, end_port, eventlet.listen) - - # In the case of backdoor port being zero, a port number is assigned by - # listen(). In any case, pull the port number out here. - port = sock.getsockname()[1] - LOG.info( - _LI('Eventlet backdoor listening on %(port)s for process %(pid)d') % - {'port': port, 'pid': os.getpid()} - ) - eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, - locals=backdoor_locals) - return port diff --git a/trove/openstack/common/loopingcall.py b/trove/openstack/common/loopingcall.py deleted file mode 100644 index a8cd4b71ca..0000000000 --- a/trove/openstack/common/loopingcall.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2011 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys -import time - -from eventlet import event -from eventlet import greenthread - -from trove.openstack.common._i18n import _LE, _LW -from trove.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - -# NOTE(zyluo): This lambda function was declared to avoid mocking collisions -# with time.time() called in the standard logging module -# during unittests. -_ts = lambda: time.time() - - -class LoopingCallDone(Exception): - """Exception to break out and stop a LoopingCallBase. - - The poll-function passed to LoopingCallBase can raise this exception to - break out of the loop normally. This is somewhat analogous to - StopIteration. - - An optional return-value can be included as the argument to the exception; - this return-value will be returned by LoopingCallBase.wait() - - """ - - def __init__(self, retvalue=True): - """:param retvalue: Value that LoopingCallBase.wait() should return.""" - self.retvalue = retvalue - - -class LoopingCallBase(object): - def __init__(self, f=None, *args, **kw): - self.args = args - self.kw = kw - self.f = f - self._running = False - self.done = None - - def stop(self): - self._running = False - - def wait(self): - return self.done.wait() - - -class FixedIntervalLoopingCall(LoopingCallBase): - """A fixed interval looping call.""" - - def start(self, interval, initial_delay=None): - self._running = True - done = event.Event() - - def _inner(): - if initial_delay: - greenthread.sleep(initial_delay) - - try: - while self._running: - start = _ts() - self.f(*self.args, **self.kw) - end = _ts() - if not self._running: - break - delay = end - start - interval - if delay > 0: - LOG.warn(_LW('task %(func_name)s run outlasted ' - 'interval by %(delay).2f sec'), - {'func_name': repr(self.f), 'delay': delay}) - greenthread.sleep(-delay if delay < 0 else 0) - except LoopingCallDone as e: - self.stop() - done.send(e.retvalue) - except Exception: - LOG.exception(_LE('in fixed duration looping call')) - done.send_exception(*sys.exc_info()) - return - else: - done.send(True) - - self.done = done - - greenthread.spawn_n(_inner) - return self.done - - -class DynamicLoopingCall(LoopingCallBase): - """A looping call which sleeps until the next known event. - - The function called should return how long to sleep for before being - called again. - """ - - def start(self, initial_delay=None, periodic_interval_max=None): - self._running = True - done = event.Event() - - def _inner(): - if initial_delay: - greenthread.sleep(initial_delay) - - try: - while self._running: - idle = self.f(*self.args, **self.kw) - if not self._running: - break - - if periodic_interval_max is not None: - idle = min(idle, periodic_interval_max) - LOG.debug('Dynamic looping call %(func_name)s sleeping ' - 'for %(idle).02f seconds', - {'func_name': repr(self.f), 'idle': idle}) - greenthread.sleep(idle) - except LoopingCallDone as e: - self.stop() - done.send(e.retvalue) - except Exception: - LOG.exception(_LE('in dynamic looping call')) - done.send_exception(*sys.exc_info()) - return - else: - done.send(True) - - self.done = done - - greenthread.spawn(_inner) - return self.done diff --git a/trove/openstack/common/periodic_task.py b/trove/openstack/common/periodic_task.py deleted file mode 100644 index 4abfe96653..0000000000 --- a/trove/openstack/common/periodic_task.py +++ /dev/null @@ -1,115 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from trove.openstack.common.gettextutils import _ -from trove.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - - -def periodic_task(*args, **kwargs): - """Decorator to indicate that a method is a periodic task. - - This decorator can be used in two ways: - - 1. Without arguments '@periodic_task', this will be run on every tick - of the periodic scheduler. - - 2. With arguments, @periodic_task(ticks_between_runs=N), this will be - run on every N ticks of the periodic scheduler. - """ - def decorator(f): - f._periodic_task = True - f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) - return f - - # NOTE(sirp): The `if` is necessary to allow the decorator to be used with - # and without parens. - # - # In the 'with-parens' case (with kwargs present), this function needs to - # return a decorator function since the interpreter will invoke it like: - # - # periodic_task(*args, **kwargs)(f) - # - # In the 'without-parens' case, the original function will be passed - # in as the first argument, like: - # - # periodic_task(f) - if kwargs: - return decorator - else: - return decorator(args[0]) - - -class _PeriodicTasksMeta(type): - def __init__(cls, names, bases, dict_): - """Metaclass that allows us to collect decorated periodic tasks.""" - super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) - - # NOTE(sirp): if the attribute is not present then we must be the base - # class, so, go ahead and initialize it. If the attribute is present, - # then we're a subclass so make a copy of it so we don't step on our - # parent's toes. - try: - cls._periodic_tasks = cls._periodic_tasks[:] - except AttributeError: - cls._periodic_tasks = [] - - try: - cls._ticks_to_skip = cls._ticks_to_skip.copy() - except AttributeError: - cls._ticks_to_skip = {} - - # This uses __dict__ instead of - # inspect.getmembers(cls, inspect.ismethod) so only the methods of the - # current class are added when this class is scanned, and base classes - # are not added redundantly. - for value in cls.__dict__.values(): - if getattr(value, '_periodic_task', False): - task = value - name = task.__name__ - cls._periodic_tasks.append((name, task)) - cls._ticks_to_skip[name] = task._ticks_between_runs - - -class PeriodicTasks(object): - __metaclass__ = _PeriodicTasksMeta - - def run_periodic_tasks(self, context, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - for task_name, task in self._periodic_tasks: - full_task_name = '.'.join([self.__class__.__name__, task_name]) - - ticks_to_skip = self._ticks_to_skip[task_name] - if ticks_to_skip > 0: - LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" - " ticks left until next run"), - dict(full_task_name=full_task_name, - ticks_to_skip=ticks_to_skip)) - self._ticks_to_skip[task_name] -= 1 - continue - - self._ticks_to_skip[task_name] = task._ticks_between_runs - LOG.debug(_("Running periodic task %(full_task_name)s"), - dict(full_task_name=full_task_name)) - - try: - task(self, context) - except Exception as e: - if raise_on_error: - raise - LOG.exception(_("Error during %(full_task_name)s:" - " %(e)s"), - dict(e=e, full_task_name=full_task_name)) diff --git a/trove/openstack/common/service.py b/trove/openstack/common/service.py deleted file mode 100644 index a76984785b..0000000000 --- a/trove/openstack/common/service.py +++ /dev/null @@ -1,504 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2011 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Generic Node base class for all workers that run on hosts.""" - -import errno -import logging as std_logging -import os -import random -import signal -import sys -import time - -try: - # Importing just the symbol here because the io module does not - # exist in Python 2.6. - from io import UnsupportedOperation # noqa -except ImportError: - # Python 2.6 - UnsupportedOperation = None - -import eventlet -from eventlet import event -from oslo_config import cfg - -from trove.openstack.common import eventlet_backdoor -from trove.openstack.common._i18n import _LE, _LI, _LW -from trove.openstack.common import log as logging -from trove.openstack.common import systemd -from trove.openstack.common import threadgroup - - -CONF = cfg.CONF -LOG = logging.getLogger(__name__) - - -def _sighup_supported(): - return hasattr(signal, 'SIGHUP') - - -def _is_daemon(): - # The process group for a foreground process will match the - # process group of the controlling terminal. If those values do - # not match, or ioctl() fails on the stdout file handle, we assume - # the process is running in the background as a daemon. - # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics - try: - is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) - except OSError as err: - if err.errno == errno.ENOTTY: - # Assume we are a daemon because there is no terminal. - is_daemon = True - else: - raise - except UnsupportedOperation: - # Could not get the fileno for stdout, so we must be a daemon. - is_daemon = True - return is_daemon - - -def _is_sighup_and_daemon(signo): - if not (_sighup_supported() and signo == signal.SIGHUP): - # Avoid checking if we are a daemon, because the signal isn't - # SIGHUP. - return False - return _is_daemon() - - -def _signo_to_signame(signo): - signals = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'} - if _sighup_supported(): - signals[signal.SIGHUP] = 'SIGHUP' - return signals[signo] - - -def _set_signals_handler(handler): - signal.signal(signal.SIGTERM, handler) - signal.signal(signal.SIGINT, handler) - if _sighup_supported(): - signal.signal(signal.SIGHUP, handler) - - -class Launcher(object): - """Launch one or more services and wait for them to complete.""" - - def __init__(self): - """Initialize the service launcher. - - :returns: None - - """ - self.services = Services() - self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - - def launch_service(self, service): - """Load and start the given service. - - :param service: The service you would like to start. - :returns: None - - """ - service.backdoor_port = self.backdoor_port - self.services.add(service) - - def stop(self): - """Stop all services which are currently running. - - :returns: None - - """ - self.services.stop() - - def wait(self): - """Waits until all services have been stopped, and then returns. - - :returns: None - - """ - self.services.wait() - - def restart(self): - """Reload config files and restart service. - - :returns: None - - """ - cfg.CONF.reload_config_files() - self.services.restart() - - -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super(SignalExit, self).__init__(exccode) - self.signo = signo - - -class ServiceLauncher(Launcher): - def _handle_signal(self, signo, frame): - # Allow the process to be killed again and die from natural causes - _set_signals_handler(signal.SIG_DFL) - raise SignalExit(signo) - - def handle_signal(self): - _set_signals_handler(self._handle_signal) - - def _wait_for_exit_or_signal(self, ready_callback=None): - status = None - signo = 0 - - LOG.debug('Full set of CONF:') - CONF.log_opt_values(LOG, std_logging.DEBUG) - - try: - if ready_callback: - ready_callback() - super(ServiceLauncher, self).wait() - except SignalExit as exc: - signame = _signo_to_signame(exc.signo) - LOG.info(_LI('Caught %s, exiting'), signame) - status = exc.code - signo = exc.signo - except SystemExit as exc: - status = exc.code - finally: - self.stop() - - return status, signo - - def wait(self, ready_callback=None): - systemd.notify_once() - while True: - self.handle_signal() - status, signo = self._wait_for_exit_or_signal(ready_callback) - if not _is_sighup_and_daemon(signo): - return status - self.restart() - - -class ServiceWrapper(object): - def __init__(self, service, workers): - self.service = service - self.workers = workers - self.children = set() - self.forktimes = [] - - -class ProcessLauncher(object): - def __init__(self, wait_interval=0.01): - """Constructor. - - :param wait_interval: The interval to sleep for between checks - of child process exit. - """ - self.children = {} - self.sigcaught = None - self.running = True - self.wait_interval = wait_interval - rfd, self.writepipe = os.pipe() - self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') - self.handle_signal() - - def handle_signal(self): - _set_signals_handler(self._handle_signal) - - def _handle_signal(self, signo, frame): - self.sigcaught = signo - self.running = False - - # Allow the process to be killed again and die from natural causes - _set_signals_handler(signal.SIG_DFL) - - def _pipe_watcher(self): - # This will block until the write end is closed when the parent - # dies unexpectedly - self.readpipe.read() - - LOG.info(_LI('Parent process has died unexpectedly, exiting')) - - sys.exit(1) - - def _child_process_handle_signal(self): - # Setup child signal handlers differently - def _sigterm(*args): - signal.signal(signal.SIGTERM, signal.SIG_DFL) - raise SignalExit(signal.SIGTERM) - - def _sighup(*args): - signal.signal(signal.SIGHUP, signal.SIG_DFL) - raise SignalExit(signal.SIGHUP) - - signal.signal(signal.SIGTERM, _sigterm) - if _sighup_supported(): - signal.signal(signal.SIGHUP, _sighup) - # Block SIGINT and let the parent send us a SIGTERM - signal.signal(signal.SIGINT, signal.SIG_IGN) - - def _child_wait_for_exit_or_signal(self, launcher): - status = 0 - signo = 0 - - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - try: - launcher.wait() - except SignalExit as exc: - signame = _signo_to_signame(exc.signo) - LOG.info(_LI('Child caught %s, exiting'), signame) - status = exc.code - signo = exc.signo - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_LE('Unhandled exception')) - status = 2 - finally: - launcher.stop() - - return status, signo - - def _child_process(self, service): - self._child_process_handle_signal() - - # Reopen the eventlet hub to make sure we don't share an epoll - # fd with parent and/or siblings, which would be bad - eventlet.hubs.use_hub() - - # Close write to ensure only parent has it open - os.close(self.writepipe) - # Create greenthread to watch for parent to close pipe - eventlet.spawn_n(self._pipe_watcher) - - # Reseed random number generator - random.seed() - - launcher = Launcher() - launcher.launch_service(service) - return launcher - - def _start_child(self, wrap): - if len(wrap.forktimes) > wrap.workers: - # Limit ourselves to one process a second (over the period of - # number of workers * 1 second). This will allow workers to - # start up quickly but ensure we don't fork off children that - # die instantly too quickly. - if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_LI('Forking too fast, sleeping')) - time.sleep(1) - - wrap.forktimes.pop(0) - - wrap.forktimes.append(time.time()) - - pid = os.fork() - if pid == 0: - launcher = self._child_process(wrap.service) - while True: - self._child_process_handle_signal() - status, signo = self._child_wait_for_exit_or_signal(launcher) - if not _is_sighup_and_daemon(signo): - break - launcher.restart() - - os._exit(status) - - LOG.info(_LI('Started child %d'), pid) - - wrap.children.add(pid) - self.children[pid] = wrap - - return pid - - def launch_service(self, service, workers=1): - wrap = ServiceWrapper(service, workers) - - LOG.info(_LI('Starting %d workers'), wrap.workers) - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - def _wait_child(self): - try: - # Don't block if no child processes have exited - pid, status = os.waitpid(0, os.WNOHANG) - if not pid: - return None - except OSError as exc: - if exc.errno not in (errno.EINTR, errno.ECHILD): - raise - return None - - if os.WIFSIGNALED(status): - sig = os.WTERMSIG(status) - LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), - dict(pid=pid, sig=sig)) - else: - code = os.WEXITSTATUS(status) - LOG.info(_LI('Child %(pid)s exited with status %(code)d'), - dict(pid=pid, code=code)) - - if pid not in self.children: - LOG.warning(_LW('pid %d not in child list'), pid) - return None - - wrap = self.children.pop(pid) - wrap.children.remove(pid) - return wrap - - def _respawn_children(self): - while self.running: - wrap = self._wait_child() - if not wrap: - # Yield to other threads if no children have exited - # Sleep for a short time to avoid excessive CPU usage - # (see bug #1095346) - eventlet.greenthread.sleep(self.wait_interval) - continue - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - systemd.notify_once() - LOG.debug('Full set of CONF:') - CONF.log_opt_values(LOG, std_logging.DEBUG) - - try: - while True: - self.handle_signal() - self._respawn_children() - # No signal means that stop was called. Don't clean up here. - if not self.sigcaught: - return - - signame = _signo_to_signame(self.sigcaught) - LOG.info(_LI('Caught %s, stopping children'), signame) - if not _is_sighup_and_daemon(self.sigcaught): - break - - for pid in self.children: - os.kill(pid, signal.SIGHUP) - self.running = True - self.sigcaught = None - except eventlet.greenlet.GreenletExit: - LOG.info(_LI("Wait called after thread killed. Cleaning up.")) - - self.stop() - - def stop(self): - """Terminate child processes and wait on each.""" - self.running = False - for pid in self.children: - try: - os.kill(pid, signal.SIGTERM) - except OSError as exc: - if exc.errno != errno.ESRCH: - raise - - # Wait for children to die - if self.children: - LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) - while self.children: - self._wait_child() - - -class Service(object): - """Service object for binaries running on hosts.""" - - def __init__(self, threads=1000): - self.tg = threadgroup.ThreadGroup(threads) - - # signal that the service is done shutting itself down: - self._done = event.Event() - - def reset(self): - # NOTE(Fengqian): docs for Event.reset() recommend against using it - self._done = event.Event() - - def start(self): - pass - - def stop(self): - self.tg.stop() - self.tg.wait() - # Signal that service cleanup is done: - if not self._done.ready(): - self._done.send() - - def wait(self): - self._done.wait() - - -class Services(object): - - def __init__(self): - self.services = [] - self.tg = threadgroup.ThreadGroup() - self.done = event.Event() - - def add(self, service): - self.services.append(service) - self.tg.add_thread(self.run_service, service, self.done) - - def stop(self): - # wait for graceful shutdown of services: - for service in self.services: - service.stop() - service.wait() - - # Each service has performed cleanup, now signal that the run_service - # wrapper threads can now die: - if not self.done.ready(): - self.done.send() - - # reap threads: - self.tg.stop() - - def wait(self): - self.tg.wait() - - def restart(self): - self.stop() - self.done = event.Event() - for restart_service in self.services: - restart_service.reset() - self.tg.add_thread(self.run_service, restart_service, self.done) - - @staticmethod - def run_service(service, done): - """Service start wrapper. - - :param service: service to run - :param done: event to wait on until a shutdown is triggered - :returns: None - - """ - service.start() - done.wait() - - -def launch(service, workers=1): - if workers is None or workers == 1: - launcher = ServiceLauncher() - launcher.launch_service(service) - else: - launcher = ProcessLauncher() - launcher.launch_service(service, workers=workers) - - return launcher diff --git a/trove/openstack/common/sslutils.py b/trove/openstack/common/sslutils.py deleted file mode 100644 index 367add92b0..0000000000 --- a/trove/openstack/common/sslutils.py +++ /dev/null @@ -1,104 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import ssl - -from oslo_config import cfg - -from trove.openstack.common.gettextutils import _ # noqa - - -ssl_opts = [ - cfg.StrOpt('ca_file', - default=None, - help="CA certificate file to use to verify " - "connecting clients"), - cfg.StrOpt('cert_file', - default=None, - help="Certificate file to use when starting " - "the server securely"), - cfg.StrOpt('key_file', - default=None, - help="Private key file to use when starting " - "the server securely"), -] - - -CONF = cfg.CONF -CONF.register_opts(ssl_opts, "ssl") - - -def is_enabled(): - cert_file = CONF.ssl.cert_file - key_file = CONF.ssl.key_file - ca_file = CONF.ssl.ca_file - use_ssl = cert_file or key_file - - if cert_file and not os.path.exists(cert_file): - raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) - - if ca_file and not os.path.exists(ca_file): - raise RuntimeError(_("Unable to find ca_file : %s") % ca_file) - - if key_file and not os.path.exists(key_file): - raise RuntimeError(_("Unable to find key_file : %s") % key_file) - - if use_ssl and (not cert_file or not key_file): - raise RuntimeError(_("When running server in SSL mode, you must " - "specify both a cert_file and key_file " - "option value in your configuration file")) - - return use_ssl - - -def wrap(sock): - ssl_kwargs = { - 'server_side': True, - 'certfile': CONF.ssl.cert_file, - 'keyfile': CONF.ssl.key_file, - 'cert_reqs': ssl.CERT_NONE, - } - - if CONF.ssl.ca_file: - ssl_kwargs['ca_certs'] = CONF.ssl.ca_file - ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED - - return ssl.wrap_socket(sock, **ssl_kwargs) - - -_SSL_PROTOCOLS = { - "tlsv1": ssl.PROTOCOL_TLSv1, - "sslv23": ssl.PROTOCOL_SSLv23, -} - -try: - _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 -except AttributeError: - pass - -try: - _SSL_PROTOCOLS["sslv3"] = ssl.PROTOCOL_SSLv3 -except AttributeError: - pass - - -def validate_ssl_version(version): - key = version.lower() - try: - return _SSL_PROTOCOLS[key] - except KeyError: - raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/trove/openstack/common/systemd.py b/trove/openstack/common/systemd.py deleted file mode 100644 index 236f76fefa..0000000000 --- a/trove/openstack/common/systemd.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2012-2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Helper module for systemd service readiness notification. -""" - -import os -import socket -import sys - -from trove.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -def _abstractify(socket_name): - if socket_name.startswith('@'): - # abstract namespace socket - socket_name = '\0%s' % socket_name[1:] - return socket_name - - -def _sd_notify(unset_env, msg): - notify_socket = os.getenv('NOTIFY_SOCKET') - if notify_socket: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.connect(_abstractify(notify_socket)) - sock.sendall(msg) - if unset_env: - del os.environ['NOTIFY_SOCKET'] - except EnvironmentError: - LOG.debug("Systemd notification failed", exc_info=True) - finally: - sock.close() - - -def notify(): - """Send notification to Systemd that service is ready. - - For details see - http://www.freedesktop.org/software/systemd/man/sd_notify.html - """ - _sd_notify(False, 'READY=1') - - -def notify_once(): - """Send notification once to Systemd that service is ready. - - Systemd sets NOTIFY_SOCKET environment variable with the name of the - socket listening for notifications from services. - This method removes the NOTIFY_SOCKET environment variable to ensure - notification is sent only once. - """ - _sd_notify(True, 'READY=1') - - -def onready(notify_socket, timeout): - """Wait for systemd style notification on the socket. - - :param notify_socket: local socket address - :type notify_socket: string - :param timeout: socket timeout - :type timeout: float - :returns: 0 service ready - 1 service not ready - 2 timeout occurred - """ - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.settimeout(timeout) - sock.bind(_abstractify(notify_socket)) - try: - msg = sock.recv(512) - except socket.timeout: - return 2 - finally: - sock.close() - if 'READY=1' in msg: - return 0 - else: - return 1 - - -if __name__ == '__main__': - # simple CLI for testing - if len(sys.argv) == 1: - notify() - elif len(sys.argv) >= 2: - timeout = float(sys.argv[1]) - notify_socket = os.getenv('NOTIFY_SOCKET') - if notify_socket: - retval = onready(notify_socket, timeout) - sys.exit(retval) diff --git a/trove/openstack/common/threadgroup.py b/trove/openstack/common/threadgroup.py deleted file mode 100644 index 4ffbc8c95a..0000000000 --- a/trove/openstack/common/threadgroup.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import threading - -import eventlet -from eventlet import greenpool - -from trove.openstack.common import log as logging -from trove.openstack.common import loopingcall - - -LOG = logging.getLogger(__name__) - - -def _thread_done(gt, *args, **kwargs): - """Callback function to be passed to GreenThread.link() when we spawn() - Calls the :class:`ThreadGroup` to notify if. - - """ - kwargs['group'].thread_done(kwargs['thread']) - - -class Thread(object): - """Wrapper around a greenthread, that holds a reference to the - :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when - it has done so it can be removed from the threads list. - """ - def __init__(self, thread, group): - self.thread = thread - self.thread.link(_thread_done, group=group, thread=self) - - def stop(self): - self.thread.kill() - - def wait(self): - return self.thread.wait() - - def link(self, func, *args, **kwargs): - self.thread.link(func, *args, **kwargs) - - -class ThreadGroup(object): - """The point of the ThreadGroup class is to: - - * keep track of timers and greenthreads (making it easier to stop them - when need be). - * provide an easy API to add timers. - """ - def __init__(self, thread_pool_size=10): - self.pool = greenpool.GreenPool(thread_pool_size) - self.threads = [] - self.timers = [] - - def add_dynamic_timer(self, callback, initial_delay=None, - periodic_interval_max=None, *args, **kwargs): - timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) - timer.start(initial_delay=initial_delay, - periodic_interval_max=periodic_interval_max) - self.timers.append(timer) - - def add_timer(self, interval, callback, initial_delay=None, - *args, **kwargs): - pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) - pulse.start(interval=interval, - initial_delay=initial_delay) - self.timers.append(pulse) - - def add_thread(self, callback, *args, **kwargs): - gt = self.pool.spawn(callback, *args, **kwargs) - th = Thread(gt, self) - self.threads.append(th) - return th - - def thread_done(self, thread): - self.threads.remove(thread) - - def _stop_threads(self): - current = threading.current_thread() - - # Iterate over a copy of self.threads so thread_done doesn't - # modify the list while we're iterating - for x in self.threads[:]: - if x is current: - # don't kill the current thread. - continue - try: - x.stop() - except eventlet.greenlet.GreenletExit: - pass - except Exception as ex: - LOG.exception(ex) - - def stop_timers(self): - for x in self.timers: - try: - x.stop() - except Exception as ex: - LOG.exception(ex) - self.timers = [] - - def stop(self, graceful=False): - """stop function has the option of graceful=True/False. - - * In case of graceful=True, wait for all threads to be finished. - Never kill threads. - * In case of graceful=False, kill threads immediately. - """ - self.stop_timers() - if graceful: - # In case of graceful=True, wait for all threads to be - # finished, never kill threads - self.wait() - else: - # In case of graceful=False(Default), kill threads - # immediately - self._stop_threads() - - def wait(self): - for x in self.timers: - try: - x.wait() - except eventlet.greenlet.GreenletExit: - pass - except Exception as ex: - LOG.exception(ex) - current = threading.current_thread() - - # Iterate over a copy of self.threads so thread_done doesn't - # modify the list while we're iterating - for x in self.threads[:]: - if x is current: - continue - try: - x.wait() - except eventlet.greenlet.GreenletExit: - pass - except Exception as ex: - LOG.exception(ex) diff --git a/trove/openstack/common/wsgi.py b/trove/openstack/common/wsgi.py index 22496196eb..88676cf41d 100644 --- a/trove/openstack/common/wsgi.py +++ b/trove/openstack/common/wsgi.py @@ -30,6 +30,8 @@ import time import eventlet.wsgi from oslo_config import cfg +from oslo_service import service +from oslo_service import sslutils import routes import routes.middleware #import six @@ -42,8 +44,6 @@ from trove.openstack.common import exception from trove.openstack.common.gettextutils import _ from trove.openstack.common import jsonutils from trove.openstack.common import log as logging -from trove.openstack.common import service -from trove.openstack.common import sslutils from trove.openstack.common import xmlutils socket_opts = [ @@ -103,8 +103,8 @@ class Service(service.Service): sock = eventlet.listen(bind_addr, backlog=backlog, family=family) - if sslutils.is_enabled(): - sock = sslutils.wrap(sock) + if sslutils.is_enabled(CONF): + sock = sslutils.wrap(CONF, sock) except socket.error as err: if err.args[0] != errno.EADDRINUSE: diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index 39e93f5ad6..e1ec5b8942 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -16,6 +16,7 @@ from sets import Set import oslo_messaging as messaging +from oslo_service import periodic_task from oslo_utils import importutils from trove.backup.models import Backup @@ -30,7 +31,6 @@ from trove.common.strategies.cluster import strategy import trove.extensions.mgmt.instances.models as mgmtmodels from trove.instance.tasks import InstanceTasks from trove.openstack.common import log as logging -from trove.openstack.common import periodic_task from trove.taskmanager import models from trove.taskmanager.models import FreshInstanceTasks, BuiltInstanceTasks @@ -43,7 +43,7 @@ class Manager(periodic_task.PeriodicTasks): target = messaging.Target(version=rpc_version.RPC_API_VERSION) def __init__(self): - super(Manager, self).__init__() + super(Manager, self).__init__(CONF) self.admin_context = TroveContext( user=CONF.nova_proxy_admin_user, auth_token=CONF.nova_proxy_admin_pass, @@ -347,8 +347,7 @@ class Manager(periodic_task.PeriodicTasks): cluster_tasks.delete_cluster(context, cluster_id) if CONF.exists_notification_transformer: - @periodic_task.periodic_task( - ticks_between_runs=CONF.exists_notification_ticks) + @periodic_task.periodic_task def publish_exists_event(self, context): """ Push this in Instance Tasks to fetch a report/collection diff --git a/trove/tests/unittests/conductor/test_conf.py b/trove/tests/unittests/conductor/test_conf.py index 680460c17d..c4305bbe1b 100644 --- a/trove/tests/unittests/conductor/test_conf.py +++ b/trove/tests/unittests/conductor/test_conf.py @@ -14,11 +14,11 @@ from mock import MagicMock from mock import patch +from oslo_service import service as os_service from trove.cmd import common as common_cmd from trove.cmd import conductor as conductor_cmd import trove.common.cfg as cfg -from trove.openstack.common import service as os_service import trove.tests.fakes.conf as fake_conf from trove.tests.unittests import trove_testtools @@ -47,7 +47,7 @@ class ConductorConfTests(trove_testtools.TestCase): super(ConductorConfTests, self).tearDown() def _test_manager(self, conf, rt_mgr_name): - def mock_launch(server, workers): + def mock_launch(conf, server, workers): qualified_mgr = "%s.%s" % (server.manager_impl.__module__, server.manager_impl.__class__.__name__) self.assertEqual(rt_mgr_name, qualified_mgr, "Invalid manager") diff --git a/trove/tests/unittests/mgmt/test_models.py b/trove/tests/unittests/mgmt/test_models.py index 4d9f027bdb..bf8a998bed 100644 --- a/trove/tests/unittests/mgmt/test_models.py +++ b/trove/tests/unittests/mgmt/test_models.py @@ -81,8 +81,7 @@ class MockMgmtInstanceTest(trove_testtools.TestCase): self.addCleanup(self.admin_client_patch.stop) self.admin_client_patch.start() CONF.set_override('host', 'test_host') - CONF.set_override('exists_notification_ticks', 1) - CONF.set_override('report_interval', 20) + CONF.set_override('exists_notification_interval', 1) CONF.set_override('notification_service_id', {'mysql': '123'}) super(MockMgmtInstanceTest, self).setUp()