spawn multiple workers in services

the collector service only ever grabs one connection to the
database at a time. this causes a massive backlog on the message
queue. enable ability to spawn multiple workers for notification
and collector services.

Change-Id: Ie85c4a6eace52c771c3eb8425839d253f8dd74cd
Closes-Bug: #1291054
This commit is contained in:
Gordon Chung 2014-03-12 10:58:20 -04:00
parent e0f83ff425
commit 0f038d2fe2
4 changed files with 51 additions and 4 deletions

View File

@ -84,8 +84,12 @@ def agent_compute():
def agent_notification():
service.prepare_service()
os_service.launch(notification.NotificationService(
cfg.CONF.host, 'ceilometer.agent.notification')).wait()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
notification.NotificationService(cfg.CONF.host,
'ceilometer.agent.notification'),
workers=service.get_workers('notification'))
launcher.wait()
def api():
@ -96,8 +100,12 @@ def api():
def collector_service():
service.prepare_service()
os_service.launch(collector.CollectorService(
cfg.CONF.host, 'ceilometer.collector')).wait()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
collector.CollectorService(cfg.CONF.host,
'ceilometer.collector'),
workers=service.get_workers('collector'))
launcher.wait()
def storage_dbsync():

View File

@ -28,6 +28,7 @@ from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer.openstack.common import rpc
from ceilometer import utils
OPTS = [
@ -40,6 +41,12 @@ OPTS = [
deprecated_group="collector",
default=['database'],
help='Dispatcher to process data.'),
cfg.IntOpt('collector_workers',
help='Number of workers for collector service. The default '
'will be equal to the number of CPUs available.'),
cfg.IntOpt('notification_workers',
help='Number of workers for notification service. The default '
'will be equal to the number of CPUs available.'),
]
cfg.CONF.register_opts(OPTS)
@ -88,6 +95,11 @@ cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials")
LOG = log.getLogger(__name__)
class WorkerException(Exception):
"""Exception for errors relating to service workers
"""
class DispatchedService(object):
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
@ -106,6 +118,17 @@ class DispatchedService(object):
self.DISPATCHER_NAMESPACE)
def get_workers(name):
workers = (cfg.CONF.get('%s_workers' % name) or
utils.cpu_count())
if workers and workers < 1:
msg = (_("%(worker_name)s value of %(workers)s is invalid, "
"must be greater than 0") %
{'worker_name': '%s_workers' % name, 'workers': str(workers)})
raise WorkerException(msg)
return workers
def prepare_service(argv=None):
gettextutils.install('ceilometer', lazy=True)
rpc.set_defaults(control_exchange='ceilometer')

View File

@ -22,6 +22,7 @@ import calendar
import copy
import datetime
import decimal
import multiprocessing
from ceilometer.openstack.common import timeutils
from ceilometer.openstack.common import units
@ -147,3 +148,10 @@ def update_nested(original_dict, updates):
else:
dict_to_update[key] = updates[key]
return dict_to_update
def cpu_count():
try:
return multiprocessing.cpu_count() or 1
except NotImplementedError:
return 1

View File

@ -40,6 +40,14 @@
# Dispatcher to process data. (multi valued)
#dispatcher=database
# Number of workers for collector service. The default will be
# equal to the number of CPUs available. (integer value)
#collector_workers=<None>
# Number of workers for notification service. The default will
# be equal to the number of CPUs available. (integer value)
#notification_workers=<None>
#
# Options defined in ceilometer.api.app