Add 'collect_end_time' param for distil-collector command

This change give us a convenient way to do usage collection
catch up to specific date. By default the end time when collection
happends is now() and the collection service will keep running
infinitely.

This patch adds a command line param called 'collect_end_time'.

If we start distil-collector manually with 'collect_end_time' param
specified, the service should be stopped automatically after all
projects usage collection is up-to-date.

Change-Id: I0d0aebcf3dfeeb987155bb8d92f2aee1918ceac8
This commit is contained in:
Lingxian Kong 2017-03-03 15:29:41 +13:00
parent d906c733a2
commit 68650cfbdc
7 changed files with 127 additions and 48 deletions

View File

@ -25,7 +25,6 @@ from distil.db import api as db_api
from distil import exceptions as exc
from distil import transformer as d_transformer
from distil.common import constants
from distil.common import general
from distil.common import openstack
LOG = logging.getLogger(__name__)
@ -46,7 +45,7 @@ class BaseCollector(object):
def get_meter(self, project, meter, start, end):
raise NotImplementedError
def collect_usage(self, project, start, end):
def collect_usage(self, project, windows):
"""Collect usage for specific tenant.
:return: True if no error happened otherwise return False.
@ -54,15 +53,6 @@ class BaseCollector(object):
LOG.info('collect_usage by %s for project: %s(%s)' %
(self.__class__.__name__, project['id'], project['name']))
windows = list(general.generate_windows(start, end))
if CONF.collector.max_windows_per_cycle > 0:
windows = windows[:CONF.collector.max_windows_per_cycle]
if not windows:
LOG.info("Skipped project %s(%s), less than 1 hour since last "
"collection time.", project['id'], project['name'])
for window_start, window_end in windows:
LOG.info("Project %s(%s) slice %s %s", project['id'],
project['name'], window_start, window_end)

View File

@ -46,14 +46,22 @@ def get_transformer_config():
return _TRANS_CONFIG
def generate_windows(start, end):
"""Generator for configured hour windows in a given range."""
def get_windows(start, end):
"""Get configured hour windows in a given range."""
windows = []
window_size = timedelta(hours=CONF.collector.collect_window)
while start + window_size <= end:
window_end = start + window_size
yield start, window_end
windows.append((start, window_end))
if len(windows) >= CONF.collector.max_windows_per_cycle:
break
start = window_end
return windows
def log_and_time_it(f):
def decorator(*args, **kwargs):

View File

@ -52,7 +52,7 @@ COLLECTOR_OPTS = [
help=('Window of usage collection in hours.')),
cfg.StrOpt('collector_backend', default='ceilometer',
help=('Data collector.')),
cfg.IntOpt('max_windows_per_cycle', default=0,
cfg.IntOpt('max_windows_per_cycle', default=1,
help=('The maximum number of windows per collecting cycle.')),
cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml',
help=('The meter mappings configuration.')),
@ -110,6 +110,15 @@ RATER_OPTS = [
'is "file".'),
]
CLI_OPTS = [
cfg.StrOpt(
'collect-end-time',
help=('The end date of usage to collect before distil-collector is '
'stopped. If not provided, distil-collector will keep running. '
'Time format is %Y-%m-%dT%H:%M:%S')
),
]
AUTH_GROUP = 'keystone_authtoken'
ODOO_GROUP = 'odoo'
COLLECTOR_GROUP = 'collector'
@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS)
CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP)
CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP)
CONF.register_opts(RATER_OPTS, group=RATER_GROUP)
CONF.register_cli_opts(CLI_OPTS)
def list_opts():

View File

@ -13,6 +13,7 @@
# under the License.
from datetime import datetime
import os
from random import shuffle
from oslo_config import cfg
@ -23,6 +24,7 @@ from stevedore import driver
from distil.db import api as db_api
from distil import exceptions
from distil.common import constants
from distil.common import general
from distil.common import openstack
@ -110,6 +112,11 @@ class CollectorService(service.Service):
def collect_usage(self):
LOG.info("Starting to collect usage...")
if CONF.collector.max_windows_per_cycle <= 0:
LOG.info("Finished collecting usage with configuration "
"max_windows_per_cycle<=0.")
return
projects = openstack.get_projects()
project_ids = [p['id'] for p in projects]
valid_projects = filter_projects(projects)
@ -119,7 +126,15 @@ class CollectorService(service.Service):
last_collect = db_api.get_last_collect(project_ids).last_collected
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
count = 0
if CONF.collect_end_time:
end = datetime.strptime(CONF.collect_end_time, constants.iso_time)
# Number of projects updated successfully.
success_count = 0
# Number of projects processed actually.
processed_count = 0
# Number of projects already up-to-date.
updated_count = 0
valid_projects = self._get_projects_by_order(valid_projects)
for project in valid_projects:
@ -127,7 +142,6 @@ class CollectorService(service.Service):
# instance. If no, will get a lock and continue processing,
# otherwise just skip it.
locks = db_api.get_project_locks(project['id'])
if locks and locks[0].owner != self.identifier:
LOG.debug(
"Project %s is being processed by collector %s." %
@ -137,13 +151,31 @@ class CollectorService(service.Service):
try:
with db_api.project_lock(project['id'], self.identifier):
processed_count += 1
# Add a project or get last_collected of existing project.
db_project = db_api.project_add(project, last_collect)
start = db_project.last_collected
if self.collector.collect_usage(project, start, end):
count = count + 1
windows = general.get_windows(start, end)
if not windows:
LOG.info(
"project %s(%s) already up-to-date.",
project['id'], project['name']
)
updated_count += 1
continue
if self.collector.collect_usage(project, windows):
success_count += 1
except Exception:
LOG.warning('Get lock failed. Process: %s' % self.identifier)
LOG.info("Finished collecting usage for %s projects." % count)
LOG.info("Finished collecting usage for %s projects." % success_count)
# If we start distil-collector manually with 'collect_end_time' param
# specified, the service should be stopped automatically after all
# projects usage collection is up-to-date.
if CONF.collect_end_time and updated_count == processed_count:
self.stop()
os.kill(os.getpid(), 9)

View File

@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase):
"""
for k, v in kw.items():
self.conf.set_override(k, v, group)
self.addCleanup(self.conf.clear_override, k, group)
def _my_dir(self):
return os.path.abspath(os.path.dirname(__file__))

View File

@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase):
srv = collector.CollectorService()
ret = srv.collector.collect_usage(
{'name': 'fake_project', 'id': '123'},
datetime.utcnow() - timedelta(hours=1.5),
datetime.utcnow()
[(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())]
)
self.assertFalse(ret)

View File

@ -13,6 +13,7 @@
# under the License.
from datetime import datetime
from datetime import timedelta
import hashlib
import json
import os
@ -20,6 +21,8 @@ import os
import mock
from distil.collector import base as collector_base
from distil.common import constants
from distil import config
from distil.db.sqlalchemy import api as db_api
from distil.service import collector
from distil.tests.unit import base
@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase):
]
collector = collector_base.BaseCollector()
collector.collect_usage(project, start_time, end_time)
collector.collect_usage(project, [(start_time, end_time)])
resources = db_api.resource_get_by_ids(project_id, [resource_id_hash])
res_info = json.loads(resources[0].info)
@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase):
project_2_collect
)
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
svc = collector.CollectorService()
svc.collect_usage()
mock_collect_usage.assert_called_once_with(
{'id': '333', 'name': 'project_3', 'description': ''},
project_1_collect,
end
[(project_1_collect, project_1_collect + timedelta(hours=1))]
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_ascending(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_ascending(self, mock_get_projects, mock_cclient):
mock_get_projects.return_value = [
{'id': '111', 'name': 'project_1', 'description': ''},
{'id': '222', 'name': 'project_2', 'description': ''},
@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertEqual(
[mock.call('111'), mock.call('222'), mock.call('333'),
mock.call('444')],
mock_getlocks.call_args_list
['111', '222', '333', '444'],
expected_projects
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_descending(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_descending(self, mock_get_projects, mock_cclient):
self.override_config('collector', project_order='descending')
mock_get_projects.return_value = [
@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertEqual(
[mock.call('444'), mock.call('333'), mock.call('222'),
mock.call('111')],
mock_getlocks.call_args_list
['444', '333', '222', '111'],
expected_projects
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_random(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_random(self, mock_get_projects, mock_cclient):
self.override_config('collector', project_order='random')
mock_get_projects.return_value = [
@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertNotEqual(
[mock.call('111'), mock.call('222'), mock.call('333'),
mock.call('444')],
mock_getlocks.call_args_list
['111', '222', '333', '444'],
expected_projects
)
@mock.patch('os.kill')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_collect_with_end_time(self, mock_get_projects, mock_cclient,
mock_kill):
end_time = datetime.utcnow() + timedelta(hours=0.5)
end_time_str = end_time.strftime(constants.iso_time)
self.override_config(collect_end_time=end_time_str)
mock_get_projects.return_value = [
{
'id': '111',
'name': 'project_1',
'description': 'description'
}
]
# Insert the project info in the database.
db_api.project_add(
{
'id': '111',
'name': 'project_1',
'description': '',
},
datetime.utcnow()
)
srv = collector.CollectorService()
srv.thread_grp = mock.Mock()
srv.collect_usage()
self.assertEqual(1, srv.thread_grp.stop.call_count)
self.assertEqual(1, mock_kill.call_count)