From 68650cfbdcbdc3221aeb35e608027cd91d5c5a01 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Fri, 3 Mar 2017 15:29:41 +1300 Subject: [PATCH] Add 'collect_end_time' param for distil-collector command This change give us a convenient way to do usage collection catch up to specific date. By default the end time when collection happends is now() and the collection service will keep running infinitely. This patch adds a command line param called 'collect_end_time'. If we start distil-collector manually with 'collect_end_time' param specified, the service should be stopped automatically after all projects usage collection is up-to-date. Change-Id: I0d0aebcf3dfeeb987155bb8d92f2aee1918ceac8 --- distil/collector/base.py | 12 +-- distil/common/general.py | 14 +++- distil/config.py | 12 ++- distil/service/collector.py | 42 ++++++++-- distil/tests/unit/base.py | 1 + distil/tests/unit/collector/test_base.py | 3 +- distil/tests/unit/service/test_collector.py | 91 +++++++++++++++------ 7 files changed, 127 insertions(+), 48 deletions(-) diff --git a/distil/collector/base.py b/distil/collector/base.py index dd8f502..d0749cf 100644 --- a/distil/collector/base.py +++ b/distil/collector/base.py @@ -25,7 +25,6 @@ from distil.db import api as db_api from distil import exceptions as exc from distil import transformer as d_transformer from distil.common import constants -from distil.common import general from distil.common import openstack LOG = logging.getLogger(__name__) @@ -46,7 +45,7 @@ class BaseCollector(object): def get_meter(self, project, meter, start, end): raise NotImplementedError - def collect_usage(self, project, start, end): + def collect_usage(self, project, windows): """Collect usage for specific tenant. :return: True if no error happened otherwise return False. @@ -54,15 +53,6 @@ class BaseCollector(object): LOG.info('collect_usage by %s for project: %s(%s)' % (self.__class__.__name__, project['id'], project['name'])) - windows = list(general.generate_windows(start, end)) - - if CONF.collector.max_windows_per_cycle > 0: - windows = windows[:CONF.collector.max_windows_per_cycle] - - if not windows: - LOG.info("Skipped project %s(%s), less than 1 hour since last " - "collection time.", project['id'], project['name']) - for window_start, window_end in windows: LOG.info("Project %s(%s) slice %s %s", project['id'], project['name'], window_start, window_end) diff --git a/distil/common/general.py b/distil/common/general.py index e129302..e4e4a68 100644 --- a/distil/common/general.py +++ b/distil/common/general.py @@ -46,14 +46,22 @@ def get_transformer_config(): return _TRANS_CONFIG -def generate_windows(start, end): - """Generator for configured hour windows in a given range.""" +def get_windows(start, end): + """Get configured hour windows in a given range.""" + windows = [] window_size = timedelta(hours=CONF.collector.collect_window) + while start + window_size <= end: window_end = start + window_size - yield start, window_end + windows.append((start, window_end)) + + if len(windows) >= CONF.collector.max_windows_per_cycle: + break + start = window_end + return windows + def log_and_time_it(f): def decorator(*args, **kwargs): diff --git a/distil/config.py b/distil/config.py index f63c3b1..0654a78 100644 --- a/distil/config.py +++ b/distil/config.py @@ -52,7 +52,7 @@ COLLECTOR_OPTS = [ help=('Window of usage collection in hours.')), cfg.StrOpt('collector_backend', default='ceilometer', help=('Data collector.')), - cfg.IntOpt('max_windows_per_cycle', default=0, + cfg.IntOpt('max_windows_per_cycle', default=1, help=('The maximum number of windows per collecting cycle.')), cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml', help=('The meter mappings configuration.')), @@ -110,6 +110,15 @@ RATER_OPTS = [ 'is "file".'), ] +CLI_OPTS = [ + cfg.StrOpt( + 'collect-end-time', + help=('The end date of usage to collect before distil-collector is ' + 'stopped. If not provided, distil-collector will keep running. ' + 'Time format is %Y-%m-%dT%H:%M:%S') + ), +] + AUTH_GROUP = 'keystone_authtoken' ODOO_GROUP = 'odoo' COLLECTOR_GROUP = 'collector' @@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS) CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP) CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP) CONF.register_opts(RATER_OPTS, group=RATER_GROUP) +CONF.register_cli_opts(CLI_OPTS) def list_opts(): diff --git a/distil/service/collector.py b/distil/service/collector.py index d6c9153..452e0be 100644 --- a/distil/service/collector.py +++ b/distil/service/collector.py @@ -13,6 +13,7 @@ # under the License. from datetime import datetime +import os from random import shuffle from oslo_config import cfg @@ -23,6 +24,7 @@ from stevedore import driver from distil.db import api as db_api from distil import exceptions +from distil.common import constants from distil.common import general from distil.common import openstack @@ -110,6 +112,11 @@ class CollectorService(service.Service): def collect_usage(self): LOG.info("Starting to collect usage...") + if CONF.collector.max_windows_per_cycle <= 0: + LOG.info("Finished collecting usage with configuration " + "max_windows_per_cycle<=0.") + return + projects = openstack.get_projects() project_ids = [p['id'] for p in projects] valid_projects = filter_projects(projects) @@ -119,7 +126,15 @@ class CollectorService(service.Service): last_collect = db_api.get_last_collect(project_ids).last_collected end = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - count = 0 + if CONF.collect_end_time: + end = datetime.strptime(CONF.collect_end_time, constants.iso_time) + + # Number of projects updated successfully. + success_count = 0 + # Number of projects processed actually. + processed_count = 0 + # Number of projects already up-to-date. + updated_count = 0 valid_projects = self._get_projects_by_order(valid_projects) for project in valid_projects: @@ -127,7 +142,6 @@ class CollectorService(service.Service): # instance. If no, will get a lock and continue processing, # otherwise just skip it. locks = db_api.get_project_locks(project['id']) - if locks and locks[0].owner != self.identifier: LOG.debug( "Project %s is being processed by collector %s." % @@ -137,13 +151,31 @@ class CollectorService(service.Service): try: with db_api.project_lock(project['id'], self.identifier): + processed_count += 1 + # Add a project or get last_collected of existing project. db_project = db_api.project_add(project, last_collect) start = db_project.last_collected - if self.collector.collect_usage(project, start, end): - count = count + 1 + windows = general.get_windows(start, end) + if not windows: + LOG.info( + "project %s(%s) already up-to-date.", + project['id'], project['name'] + ) + updated_count += 1 + continue + + if self.collector.collect_usage(project, windows): + success_count += 1 except Exception: LOG.warning('Get lock failed. Process: %s' % self.identifier) - LOG.info("Finished collecting usage for %s projects." % count) + LOG.info("Finished collecting usage for %s projects." % success_count) + + # If we start distil-collector manually with 'collect_end_time' param + # specified, the service should be stopped automatically after all + # projects usage collection is up-to-date. + if CONF.collect_end_time and updated_count == processed_count: + self.stop() + os.kill(os.getpid(), 9) diff --git a/distil/tests/unit/base.py b/distil/tests/unit/base.py index 6602a1f..70466a3 100644 --- a/distil/tests/unit/base.py +++ b/distil/tests/unit/base.py @@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase): """ for k, v in kw.items(): self.conf.set_override(k, v, group) + self.addCleanup(self.conf.clear_override, k, group) def _my_dir(self): return os.path.abspath(os.path.dirname(__file__)) diff --git a/distil/tests/unit/collector/test_base.py b/distil/tests/unit/collector/test_base.py index 58ea4be..32e54cf 100644 --- a/distil/tests/unit/collector/test_base.py +++ b/distil/tests/unit/collector/test_base.py @@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase): srv = collector.CollectorService() ret = srv.collector.collect_usage( {'name': 'fake_project', 'id': '123'}, - datetime.utcnow() - timedelta(hours=1.5), - datetime.utcnow() + [(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())] ) self.assertFalse(ret) diff --git a/distil/tests/unit/service/test_collector.py b/distil/tests/unit/service/test_collector.py index e9ae683..52542f8 100644 --- a/distil/tests/unit/service/test_collector.py +++ b/distil/tests/unit/service/test_collector.py @@ -13,6 +13,7 @@ # under the License. from datetime import datetime +from datetime import timedelta import hashlib import json import os @@ -20,6 +21,8 @@ import os import mock from distil.collector import base as collector_base +from distil.common import constants +from distil import config from distil.db.sqlalchemy import api as db_api from distil.service import collector from distil.tests.unit import base @@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase): ] collector = collector_base.BaseCollector() - collector.collect_usage(project, start_time, end_time) + collector.collect_usage(project, [(start_time, end_time)]) resources = db_api.resource_get_by_ids(project_id, [resource_id_hash]) res_info = json.loads(resources[0].info) @@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase): project_2_collect ) - end = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - svc = collector.CollectorService() svc.collect_usage() mock_collect_usage.assert_called_once_with( {'id': '333', 'name': 'project_3', 'description': ''}, - project_1_collect, - end + [(project_1_collect, project_1_collect + timedelta(hours=1))] ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_ascending(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_ascending(self, mock_get_projects, mock_cclient): mock_get_projects.return_value = [ {'id': '111', 'name': 'project_1', 'description': ''}, {'id': '222', 'name': 'project_2', 'description': ''}, @@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertEqual( - [mock.call('111'), mock.call('222'), mock.call('333'), - mock.call('444')], - mock_getlocks.call_args_list + ['111', '222', '333', '444'], + expected_projects ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_descending(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_descending(self, mock_get_projects, mock_cclient): self.override_config('collector', project_order='descending') mock_get_projects.return_value = [ @@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertEqual( - [mock.call('444'), mock.call('333'), mock.call('222'), - mock.call('111')], - mock_getlocks.call_args_list + ['444', '333', '222', '111'], + expected_projects ) - @mock.patch('distil.db.api.get_project_locks') @mock.patch('distil.common.openstack.get_ceilometer_client') @mock.patch('distil.common.openstack.get_projects') - def test_project_order_random(self, mock_get_projects, mock_cclient, - mock_getlocks): + def test_project_order_random(self, mock_get_projects, mock_cclient): self.override_config('collector', project_order='random') mock_get_projects.return_value = [ @@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase): 'name': 'project_1', 'description': '', }, - datetime(2017, 5, 17, 19) + datetime.utcnow() - timedelta(hours=2) ) svc = collector.CollectorService() + svc.collector = mock.Mock() svc.collect_usage() + expected_projects = [] + for call in svc.collector.collect_usage.call_args_list: + expected_projects.append(call[0][0]['id']) + self.assertNotEqual( - [mock.call('111'), mock.call('222'), mock.call('333'), - mock.call('444')], - mock_getlocks.call_args_list + ['111', '222', '333', '444'], + expected_projects ) + + @mock.patch('os.kill') + @mock.patch('distil.common.openstack.get_ceilometer_client') + @mock.patch('distil.common.openstack.get_projects') + def test_collect_with_end_time(self, mock_get_projects, mock_cclient, + mock_kill): + end_time = datetime.utcnow() + timedelta(hours=0.5) + end_time_str = end_time.strftime(constants.iso_time) + self.override_config(collect_end_time=end_time_str) + + mock_get_projects.return_value = [ + { + 'id': '111', + 'name': 'project_1', + 'description': 'description' + } + ] + # Insert the project info in the database. + db_api.project_add( + { + 'id': '111', + 'name': 'project_1', + 'description': '', + }, + datetime.utcnow() + ) + + srv = collector.CollectorService() + srv.thread_grp = mock.Mock() + srv.collect_usage() + + self.assertEqual(1, srv.thread_grp.stop.call_count) + self.assertEqual(1, mock_kill.call_count)