Merge "Add 'collect_end_time' param for distil-collector command"

This commit is contained in:
Jenkins 2017-08-04 01:50:05 +00:00 committed by Gerrit Code Review
commit e2dbdd4b50
7 changed files with 127 additions and 48 deletions

View File

@ -25,7 +25,6 @@ from distil.db import api as db_api
from distil import exceptions as exc
from distil import transformer as d_transformer
from distil.common import constants
from distil.common import general
from distil.common import openstack
LOG = logging.getLogger(__name__)
@ -46,7 +45,7 @@ class BaseCollector(object):
def get_meter(self, project, meter, start, end):
raise NotImplementedError
def collect_usage(self, project, start, end):
def collect_usage(self, project, windows):
"""Collect usage for specific tenant.
:return: True if no error happened otherwise return False.
@ -54,15 +53,6 @@ class BaseCollector(object):
LOG.info('collect_usage by %s for project: %s(%s)' %
(self.__class__.__name__, project['id'], project['name']))
windows = list(general.generate_windows(start, end))
if CONF.collector.max_windows_per_cycle > 0:
windows = windows[:CONF.collector.max_windows_per_cycle]
if not windows:
LOG.info("Skipped project %s(%s), less than 1 hour since last "
"collection time.", project['id'], project['name'])
for window_start, window_end in windows:
LOG.info("Project %s(%s) slice %s %s", project['id'],
project['name'], window_start, window_end)

View File

@ -46,14 +46,22 @@ def get_transformer_config():
return _TRANS_CONFIG
def generate_windows(start, end):
"""Generator for configured hour windows in a given range."""
def get_windows(start, end):
"""Get configured hour windows in a given range."""
windows = []
window_size = timedelta(hours=CONF.collector.collect_window)
while start + window_size <= end:
window_end = start + window_size
yield start, window_end
windows.append((start, window_end))
if len(windows) >= CONF.collector.max_windows_per_cycle:
break
start = window_end
return windows
def log_and_time_it(f):
def decorator(*args, **kwargs):

View File

@ -52,7 +52,7 @@ COLLECTOR_OPTS = [
help=('Window of usage collection in hours.')),
cfg.StrOpt('collector_backend', default='ceilometer',
help=('Data collector.')),
cfg.IntOpt('max_windows_per_cycle', default=0,
cfg.IntOpt('max_windows_per_cycle', default=1,
help=('The maximum number of windows per collecting cycle.')),
cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml',
help=('The meter mappings configuration.')),
@ -110,6 +110,15 @@ RATER_OPTS = [
'is "file".'),
]
CLI_OPTS = [
cfg.StrOpt(
'collect-end-time',
help=('The end date of usage to collect before distil-collector is '
'stopped. If not provided, distil-collector will keep running. '
'Time format is %Y-%m-%dT%H:%M:%S')
),
]
AUTH_GROUP = 'keystone_authtoken'
ODOO_GROUP = 'odoo'
COLLECTOR_GROUP = 'collector'
@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS)
CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP)
CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP)
CONF.register_opts(RATER_OPTS, group=RATER_GROUP)
CONF.register_cli_opts(CLI_OPTS)
def list_opts():

View File

@ -13,6 +13,7 @@
# under the License.
from datetime import datetime
import os
from random import shuffle
from oslo_config import cfg
@ -23,6 +24,7 @@ from stevedore import driver
from distil.db import api as db_api
from distil import exceptions
from distil.common import constants
from distil.common import general
from distil.common import openstack
@ -110,6 +112,11 @@ class CollectorService(service.Service):
def collect_usage(self):
LOG.info("Starting to collect usage...")
if CONF.collector.max_windows_per_cycle <= 0:
LOG.info("Finished collecting usage with configuration "
"max_windows_per_cycle<=0.")
return
projects = openstack.get_projects()
project_ids = [p['id'] for p in projects]
valid_projects = filter_projects(projects)
@ -119,7 +126,15 @@ class CollectorService(service.Service):
last_collect = db_api.get_last_collect(project_ids).last_collected
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
count = 0
if CONF.collect_end_time:
end = datetime.strptime(CONF.collect_end_time, constants.iso_time)
# Number of projects updated successfully.
success_count = 0
# Number of projects processed actually.
processed_count = 0
# Number of projects already up-to-date.
updated_count = 0
valid_projects = self._get_projects_by_order(valid_projects)
for project in valid_projects:
@ -127,7 +142,6 @@ class CollectorService(service.Service):
# instance. If no, will get a lock and continue processing,
# otherwise just skip it.
locks = db_api.get_project_locks(project['id'])
if locks and locks[0].owner != self.identifier:
LOG.debug(
"Project %s is being processed by collector %s." %
@ -137,13 +151,31 @@ class CollectorService(service.Service):
try:
with db_api.project_lock(project['id'], self.identifier):
processed_count += 1
# Add a project or get last_collected of existing project.
db_project = db_api.project_add(project, last_collect)
start = db_project.last_collected
if self.collector.collect_usage(project, start, end):
count = count + 1
windows = general.get_windows(start, end)
if not windows:
LOG.info(
"project %s(%s) already up-to-date.",
project['id'], project['name']
)
updated_count += 1
continue
if self.collector.collect_usage(project, windows):
success_count += 1
except Exception:
LOG.warning('Get lock failed. Process: %s' % self.identifier)
LOG.info("Finished collecting usage for %s projects." % count)
LOG.info("Finished collecting usage for %s projects." % success_count)
# If we start distil-collector manually with 'collect_end_time' param
# specified, the service should be stopped automatically after all
# projects usage collection is up-to-date.
if CONF.collect_end_time and updated_count == processed_count:
self.stop()
os.kill(os.getpid(), 9)

View File

@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase):
"""
for k, v in kw.items():
self.conf.set_override(k, v, group)
self.addCleanup(self.conf.clear_override, k, group)
def _my_dir(self):
return os.path.abspath(os.path.dirname(__file__))

View File

@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase):
srv = collector.CollectorService()
ret = srv.collector.collect_usage(
{'name': 'fake_project', 'id': '123'},
datetime.utcnow() - timedelta(hours=1.5),
datetime.utcnow()
[(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())]
)
self.assertFalse(ret)

View File

@ -13,6 +13,7 @@
# under the License.
from datetime import datetime
from datetime import timedelta
import hashlib
import json
import os
@ -20,6 +21,8 @@ import os
import mock
from distil.collector import base as collector_base
from distil.common import constants
from distil import config
from distil.db.sqlalchemy import api as db_api
from distil.service import collector
from distil.tests.unit import base
@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase):
]
collector = collector_base.BaseCollector()
collector.collect_usage(project, start_time, end_time)
collector.collect_usage(project, [(start_time, end_time)])
resources = db_api.resource_get_by_ids(project_id, [resource_id_hash])
res_info = json.loads(resources[0].info)
@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase):
project_2_collect
)
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
svc = collector.CollectorService()
svc.collect_usage()
mock_collect_usage.assert_called_once_with(
{'id': '333', 'name': 'project_3', 'description': ''},
project_1_collect,
end
[(project_1_collect, project_1_collect + timedelta(hours=1))]
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_ascending(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_ascending(self, mock_get_projects, mock_cclient):
mock_get_projects.return_value = [
{'id': '111', 'name': 'project_1', 'description': ''},
{'id': '222', 'name': 'project_2', 'description': ''},
@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertEqual(
[mock.call('111'), mock.call('222'), mock.call('333'),
mock.call('444')],
mock_getlocks.call_args_list
['111', '222', '333', '444'],
expected_projects
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_descending(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_descending(self, mock_get_projects, mock_cclient):
self.override_config('collector', project_order='descending')
mock_get_projects.return_value = [
@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertEqual(
[mock.call('444'), mock.call('333'), mock.call('222'),
mock.call('111')],
mock_getlocks.call_args_list
['444', '333', '222', '111'],
expected_projects
)
@mock.patch('distil.db.api.get_project_locks')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_project_order_random(self, mock_get_projects, mock_cclient,
mock_getlocks):
def test_project_order_random(self, mock_get_projects, mock_cclient):
self.override_config('collector', project_order='random')
mock_get_projects.return_value = [
@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase):
'name': 'project_1',
'description': '',
},
datetime(2017, 5, 17, 19)
datetime.utcnow() - timedelta(hours=2)
)
svc = collector.CollectorService()
svc.collector = mock.Mock()
svc.collect_usage()
expected_projects = []
for call in svc.collector.collect_usage.call_args_list:
expected_projects.append(call[0][0]['id'])
self.assertNotEqual(
[mock.call('111'), mock.call('222'), mock.call('333'),
mock.call('444')],
mock_getlocks.call_args_list
['111', '222', '333', '444'],
expected_projects
)
@mock.patch('os.kill')
@mock.patch('distil.common.openstack.get_ceilometer_client')
@mock.patch('distil.common.openstack.get_projects')
def test_collect_with_end_time(self, mock_get_projects, mock_cclient,
mock_kill):
end_time = datetime.utcnow() + timedelta(hours=0.5)
end_time_str = end_time.strftime(constants.iso_time)
self.override_config(collect_end_time=end_time_str)
mock_get_projects.return_value = [
{
'id': '111',
'name': 'project_1',
'description': 'description'
}
]
# Insert the project info in the database.
db_api.project_add(
{
'id': '111',
'name': 'project_1',
'description': '',
},
datetime.utcnow()
)
srv = collector.CollectorService()
srv.thread_grp = mock.Mock()
srv.collect_usage()
self.assertEqual(1, srv.thread_grp.stop.call_count)
self.assertEqual(1, mock_kill.call_count)