diff --git a/distil/collector/base.py b/distil/collector/base.py index dd8f502..d0749cf 100644 --- a/distil/collector/base.py +++ b/distil/collector/base.py @@ -25,7 +25,6 @@ from distil.db import api as db_api from distil import exceptions as exc from distil import transformer as d_transformer from distil.common import constants -from distil.common import general from distil.common import openstack LOG = logging.getLogger(__name__) @@ -46,7 +45,7 @@ class BaseCollector(object): def get_meter(self, project, meter, start, end): raise NotImplementedError - def collect_usage(self, project, start, end): + def collect_usage(self, project, windows): """Collect usage for specific tenant. :return: True if no error happened otherwise return False. @@ -54,15 +53,6 @@ class BaseCollector(object): LOG.info('collect_usage by %s for project: %s(%s)' % (self.__class__.__name__, project['id'], project['name'])) - windows = list(general.generate_windows(start, end)) - - if CONF.collector.max_windows_per_cycle > 0: - windows = windows[:CONF.collector.max_windows_per_cycle] - - if not windows: - LOG.info("Skipped project %s(%s), less than 1 hour since last " - "collection time.", project['id'], project['name']) - for window_start, window_end in windows: LOG.info("Project %s(%s) slice %s %s", project['id'], project['name'], window_start, window_end) diff --git a/distil/common/general.py b/distil/common/general.py index e129302..e4e4a68 100644 --- a/distil/common/general.py +++ b/distil/common/general.py @@ -46,14 +46,22 @@ def get_transformer_config(): return _TRANS_CONFIG -def generate_windows(start, end): - """Generator for configured hour windows in a given range.""" +def get_windows(start, end): + """Get configured hour windows in a given range.""" + windows = [] window_size = timedelta(hours=CONF.collector.collect_window) + while start + window_size <= end: window_end = start + window_size - yield start, window_end + windows.append((start, window_end)) + + if len(windows) >= CONF.collector.max_windows_per_cycle: + break + start = window_end + return windows + def log_and_time_it(f): def decorator(*args, **kwargs): diff --git a/distil/config.py b/distil/config.py index f63c3b1..0654a78 100644 --- a/distil/config.py +++ b/distil/config.py @@ -52,7 +52,7 @@ COLLECTOR_OPTS = [ help=('Window of usage collection in hours.')), cfg.StrOpt('collector_backend', default='ceilometer', help=('Data collector.')), - cfg.IntOpt('max_windows_per_cycle', default=0, + cfg.IntOpt('max_windows_per_cycle', default=1, help=('The maximum number of windows per collecting cycle.')), cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml', help=('The meter mappings configuration.')), @@ -110,6 +110,15 @@ RATER_OPTS = [ 'is "file".'), ] +CLI_OPTS = [ + cfg.StrOpt( + 'collect-end-time', + help=('The end date of usage to collect before distil-collector is ' + 'stopped. If not provided, distil-collector will keep running. ' + 'Time format is %Y-%m-%dT%H:%M:%S') + ), +] + AUTH_GROUP = 'keystone_authtoken' ODOO_GROUP = 'odoo' COLLECTOR_GROUP = 'collector' @@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS) CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP) CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP) CONF.register_opts(RATER_OPTS, group=RATER_GROUP) +CONF.register_cli_opts(CLI_OPTS) def list_opts(): diff --git a/distil/service/collector.py b/distil/service/collector.py index d6c9153..452e0be 100644 --- a/distil/service/collector.py +++ b/distil/service/collector.py @@ -13,6 +13,7 @@ # under the License. from datetime import datetime +import os from random import shuffle from oslo_config import cfg @@ -23,6 +24,7 @@ from stevedore import driver from distil.db import api as db_api from distil import exceptions +from distil.common import constants from distil.common import general from distil.common import openstack @@ -110,6 +112,11 @@ class CollectorService(service.Service): def collect_usage(self): LOG.info("Starting to collect usage...") + if CONF.collector.max_windows_per_cycle <= 0: + LOG.info("Finished collecting usage with configuration " + "max_windows_per_cycle<=0.") + return + projects = openstack.get_projects() project_ids = [p['id'] for p in projects] valid_projects = filter_projects(projects) @@ -119,7 +126,15 @@ class CollectorService(service.Service): last_collect = db_api.get_last_collect(project_ids).last_collected end = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - count = 0 + if CONF.collect_end_time: + end = datetime.strptime(CONF.collect_end_time, constants.iso_time) + + # Number of projects updated successfully. + success_count = 0 + # Number of projects processed actually. + processed_count = 0 + # Number of projects already up-to-date. + updated_count = 0 valid_projects = self._get_projects_by_order(valid_projects) for project in valid_projects: @@ -127,7 +142,6 @@ class CollectorService(service.Service): # instance. If no, will get a lock and continue processing, # otherwise just skip it. locks = db_api.get_project_locks(project['id']) - if locks and locks[0].owner != self.identifier: LOG.debug( "Project %s is being processed by collector %s." % @@ -137,13 +151,31 @@ class CollectorService(service.Service): try: with db_api.project_lock(project['id'], self.identifier): + processed_count += 1 + # Add a project or get last_collected of existing project. db_project = db_api.project_add(project, last_collect) start = db_project.last_collected - if self.collector.collect_usage(project, start, end): - count = count + 1 + windows = general.get_windows(start, end) + if not windows: + LOG.info( + "project %s(%s) already up-to-date.", + project['id'], project['name'] + ) + updated_count += 1 + continue + + if self.collector.collect_usage(project, windows): + success_count += 1 except Exception: LOG.warning('Get lock failed. Process: %s' % self.identifier) - LOG.info("Finished collecting usage for %s projects." % count) + LOG.info("Finished collecting usage for %s projects." % success_count) + + # If we start distil-collector manually with 'collect_end_time' param + # specified, the service should be stopped automatically after all + # projects usage collection is up-to-date. + if CONF.collect_end_time and updated_count == processed_count: + self.stop() + os.kill(os.getpid(), 9) diff --git a/distil/tests/unit/base.py b/distil/tests/unit/base.py index 6602a1f..70466a3 100644 --- a/distil/tests/unit/base.py +++ b/distil/tests/unit/base.py @@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase): """ for k, v in kw.items(): self.conf.set_override(k, v, group) + self.addCleanup(self.conf.clear_override, k, group) def _my_dir(self): return os.path.abspath(os.path.dirname(__file__)) diff --git a/distil/tests/unit/collector/test_base.py b/distil/tests/unit/collector/test_base.py index 58ea4be..32e54cf 100644 --- a/distil/tests/unit/collector/test_base.py +++ b/distil/tests/unit/collector/test_base.py @@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase): srv = collector.CollectorService() ret = srv.collector.collect_usage( {'name': 'fake_project', 'id': '123'}, - datetime.utcnow() - timedelta(hours=1.5), - datetime.utcnow() + [(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())] ) self.assertFalse(ret) diff --git a/distil/tests/unit/service/test_collector.py b/distil/tests/unit/service/test_collector.py index e9ae683..52542f8 100644 --- a/distil/tests/unit/service/test_collector.py +++ b/distil/tests/unit/service/test_collector.py @@ -13,6 +13,7 @@ # under the License. from datetime import datetime +from datetime import timedelta import hashlib import json import os @@ -20,6 +21,8 @@ import os import mock from distil.collector import base as collector_base +from distil.common import constants +from distil import config from distil.db.sqlalchemy import api as db_api from distil.service import collector from distil.tests.unit import base @@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase): ] collector = collector_base.BaseCollector() - collector.collect_usage(project, start_time, end_time) + collector.collect_usage(project, [(start_time, end_time)]) resources = db_api.resource_get_by_ids(project_id, [resource_id_hash]) res_info = json.loads(resources[0].info) @@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase): project_2_collect ) - end = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - svc = collector.CollectorService() svc.collect_usage() mock_collect_usage.assert_called_once_with( {'id': '333', 'name': 'project_3', 'description': ''}, - project_1_collect, - end + [(project_1_collect, project_1_collect + timedelta(hours=1))] ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_ascending(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_ascending(self, mock_get_projects, mock_cclient): mock_get_projects.return_value = [ {'id': '111', 'name': 'project_1', 'description': ''}, {'id': '222', 'name': 'project_2', 'description': ''}, @@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertEqual( - [mock.call('111'), mock.call('222'), mock.call('333'), - mock.call('444')], - mock_getlocks.call_args_list + ['111', '222', '333', '444'], + expected_projects ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_descending(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_descending(self, mock_get_projects, mock_cclient): self.override_config('collector', project_order='descending') mock_get_projects.return_value = [ @@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertEqual( - [mock.call('444'), mock.call('333'), mock.call('222'), - mock.call('111')], - mock_getlocks.call_args_list + ['444', '333', '222', '111'], + expected_projects ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_random(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_random(self, mock_get_projects, mock_cclient): self.override_config('collector', project_order='random') mock_get_projects.return_value = [ @@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertNotEqual( - [mock.call('111'), mock.call('222'), mock.call('333'), - mock.call('444')], - mock_getlocks.call_args_list + ['111', '222', '333', '444'], + expected_projects ) + + @mock.patch('os.kill') + @mock.patch('distil.common.openstack.get_ceilometer_client') + @mock.patch('distil.common.openstack.get_projects') + def test_collect_with_end_time(self, mock_get_projects, mock_cclient, + mock_kill): + end_time = datetime.utcnow() + timedelta(hours=0.5) + end_time_str = end_time.strftime(constants.iso_time) + self.override_config(collect_end_time=end_time_str) + + mock_get_projects.return_value = [ + { + 'id': '111', + 'name': 'project_1', + 'description': 'description' + } + ] + # Insert the project info in the database. + db_api.project_add( + { + 'id': '111', + 'name': 'project_1', + 'description': '', + }, + datetime.utcnow() + ) + + srv = collector.CollectorService() + srv.thread_grp = mock.Mock() + srv.collect_usage() + + self.assertEqual(1, srv.thread_grp.stop.call_count) + self.assertEqual(1, mock_kill.call_count)