diff --git a/ceilometer/cli.py b/ceilometer/cli.py index 26c011afd7..ee671fb894 100644 --- a/ceilometer/cli.py +++ b/ceilometer/cli.py @@ -84,8 +84,12 @@ def agent_compute(): def agent_notification(): service.prepare_service() - os_service.launch(notification.NotificationService( - cfg.CONF.host, 'ceilometer.agent.notification')).wait() + launcher = os_service.ProcessLauncher() + launcher.launch_service( + notification.NotificationService(cfg.CONF.host, + 'ceilometer.agent.notification'), + workers=service.get_workers('notification')) + launcher.wait() def api(): @@ -96,8 +100,12 @@ def api(): def collector_service(): service.prepare_service() - os_service.launch(collector.CollectorService( - cfg.CONF.host, 'ceilometer.collector')).wait() + launcher = os_service.ProcessLauncher() + launcher.launch_service( + collector.CollectorService(cfg.CONF.host, + 'ceilometer.collector'), + workers=service.get_workers('collector')) + launcher.wait() def storage_dbsync(): diff --git a/ceilometer/service.py b/ceilometer/service.py index f69ee3db5c..76c93010ce 100644 --- a/ceilometer/service.py +++ b/ceilometer/service.py @@ -28,6 +28,7 @@ from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import rpc +from ceilometer import utils OPTS = [ @@ -40,6 +41,12 @@ OPTS = [ deprecated_group="collector", default=['database'], help='Dispatcher to process data.'), + cfg.IntOpt('collector_workers', + help='Number of workers for collector service. The default ' + 'will be equal to the number of CPUs available.'), + cfg.IntOpt('notification_workers', + help='Number of workers for notification service. The default ' + 'will be equal to the number of CPUs available.'), ] cfg.CONF.register_opts(OPTS) @@ -88,6 +95,11 @@ cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials") LOG = log.getLogger(__name__) +class WorkerException(Exception): + """Exception for errors relating to service workers + """ + + class DispatchedService(object): DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' @@ -106,6 +118,17 @@ class DispatchedService(object): self.DISPATCHER_NAMESPACE) +def get_workers(name): + workers = (cfg.CONF.get('%s_workers' % name) or + utils.cpu_count()) + if workers and workers < 1: + msg = (_("%(worker_name)s value of %(workers)s is invalid, " + "must be greater than 0") % + {'worker_name': '%s_workers' % name, 'workers': str(workers)}) + raise WorkerException(msg) + return workers + + def prepare_service(argv=None): gettextutils.install('ceilometer', lazy=True) rpc.set_defaults(control_exchange='ceilometer') diff --git a/ceilometer/utils.py b/ceilometer/utils.py index e7301b8acf..0326395dcb 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -22,6 +22,7 @@ import calendar import copy import datetime import decimal +import multiprocessing from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import units @@ -147,3 +148,10 @@ def update_nested(original_dict, updates): else: dict_to_update[key] = updates[key] return dict_to_update + + +def cpu_count(): + try: + return multiprocessing.cpu_count() or 1 + except NotImplementedError: + return 1 diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 846ed08332..e0c1e56d65 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -40,6 +40,14 @@ # Dispatcher to process data. (multi valued) #dispatcher=database +# Number of workers for collector service. The default will be +# equal to the number of CPUs available. (integer value) +#collector_workers= + +# Number of workers for notification service. The default will +# be equal to the number of CPUs available. (integer value) +#notification_workers= + # # Options defined in ceilometer.api.app