Add 'collect_end_time' param for distil-collector command
This change give us a convenient way to do usage collection catch up to specific date. By default the end time when collection happends is now() and the collection service will keep running infinitely. This patch adds a command line param called 'collect_end_time'. If we start distil-collector manually with 'collect_end_time' param specified, the service should be stopped automatically after all projects usage collection is up-to-date. Change-Id: I0d0aebcf3dfeeb987155bb8d92f2aee1918ceac8
This commit is contained in:
parent
d906c733a2
commit
68650cfbdc
|
@ -25,7 +25,6 @@ from distil.db import api as db_api
|
|||
from distil import exceptions as exc
|
||||
from distil import transformer as d_transformer
|
||||
from distil.common import constants
|
||||
from distil.common import general
|
||||
from distil.common import openstack
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -46,7 +45,7 @@ class BaseCollector(object):
|
|||
def get_meter(self, project, meter, start, end):
|
||||
raise NotImplementedError
|
||||
|
||||
def collect_usage(self, project, start, end):
|
||||
def collect_usage(self, project, windows):
|
||||
"""Collect usage for specific tenant.
|
||||
|
||||
:return: True if no error happened otherwise return False.
|
||||
|
@ -54,15 +53,6 @@ class BaseCollector(object):
|
|||
LOG.info('collect_usage by %s for project: %s(%s)' %
|
||||
(self.__class__.__name__, project['id'], project['name']))
|
||||
|
||||
windows = list(general.generate_windows(start, end))
|
||||
|
||||
if CONF.collector.max_windows_per_cycle > 0:
|
||||
windows = windows[:CONF.collector.max_windows_per_cycle]
|
||||
|
||||
if not windows:
|
||||
LOG.info("Skipped project %s(%s), less than 1 hour since last "
|
||||
"collection time.", project['id'], project['name'])
|
||||
|
||||
for window_start, window_end in windows:
|
||||
LOG.info("Project %s(%s) slice %s %s", project['id'],
|
||||
project['name'], window_start, window_end)
|
||||
|
|
|
@ -46,14 +46,22 @@ def get_transformer_config():
|
|||
return _TRANS_CONFIG
|
||||
|
||||
|
||||
def generate_windows(start, end):
|
||||
"""Generator for configured hour windows in a given range."""
|
||||
def get_windows(start, end):
|
||||
"""Get configured hour windows in a given range."""
|
||||
windows = []
|
||||
window_size = timedelta(hours=CONF.collector.collect_window)
|
||||
|
||||
while start + window_size <= end:
|
||||
window_end = start + window_size
|
||||
yield start, window_end
|
||||
windows.append((start, window_end))
|
||||
|
||||
if len(windows) >= CONF.collector.max_windows_per_cycle:
|
||||
break
|
||||
|
||||
start = window_end
|
||||
|
||||
return windows
|
||||
|
||||
|
||||
def log_and_time_it(f):
|
||||
def decorator(*args, **kwargs):
|
||||
|
|
|
@ -52,7 +52,7 @@ COLLECTOR_OPTS = [
|
|||
help=('Window of usage collection in hours.')),
|
||||
cfg.StrOpt('collector_backend', default='ceilometer',
|
||||
help=('Data collector.')),
|
||||
cfg.IntOpt('max_windows_per_cycle', default=0,
|
||||
cfg.IntOpt('max_windows_per_cycle', default=1,
|
||||
help=('The maximum number of windows per collecting cycle.')),
|
||||
cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml',
|
||||
help=('The meter mappings configuration.')),
|
||||
|
@ -110,6 +110,15 @@ RATER_OPTS = [
|
|||
'is "file".'),
|
||||
]
|
||||
|
||||
CLI_OPTS = [
|
||||
cfg.StrOpt(
|
||||
'collect-end-time',
|
||||
help=('The end date of usage to collect before distil-collector is '
|
||||
'stopped. If not provided, distil-collector will keep running. '
|
||||
'Time format is %Y-%m-%dT%H:%M:%S')
|
||||
),
|
||||
]
|
||||
|
||||
AUTH_GROUP = 'keystone_authtoken'
|
||||
ODOO_GROUP = 'odoo'
|
||||
COLLECTOR_GROUP = 'collector'
|
||||
|
@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS)
|
|||
CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP)
|
||||
CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP)
|
||||
CONF.register_opts(RATER_OPTS, group=RATER_GROUP)
|
||||
CONF.register_cli_opts(CLI_OPTS)
|
||||
|
||||
|
||||
def list_opts():
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
import os
|
||||
from random import shuffle
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -23,6 +24,7 @@ from stevedore import driver
|
|||
|
||||
from distil.db import api as db_api
|
||||
from distil import exceptions
|
||||
from distil.common import constants
|
||||
from distil.common import general
|
||||
from distil.common import openstack
|
||||
|
||||
|
@ -110,6 +112,11 @@ class CollectorService(service.Service):
|
|||
def collect_usage(self):
|
||||
LOG.info("Starting to collect usage...")
|
||||
|
||||
if CONF.collector.max_windows_per_cycle <= 0:
|
||||
LOG.info("Finished collecting usage with configuration "
|
||||
"max_windows_per_cycle<=0.")
|
||||
return
|
||||
|
||||
projects = openstack.get_projects()
|
||||
project_ids = [p['id'] for p in projects]
|
||||
valid_projects = filter_projects(projects)
|
||||
|
@ -119,7 +126,15 @@ class CollectorService(service.Service):
|
|||
last_collect = db_api.get_last_collect(project_ids).last_collected
|
||||
|
||||
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
|
||||
count = 0
|
||||
if CONF.collect_end_time:
|
||||
end = datetime.strptime(CONF.collect_end_time, constants.iso_time)
|
||||
|
||||
# Number of projects updated successfully.
|
||||
success_count = 0
|
||||
# Number of projects processed actually.
|
||||
processed_count = 0
|
||||
# Number of projects already up-to-date.
|
||||
updated_count = 0
|
||||
|
||||
valid_projects = self._get_projects_by_order(valid_projects)
|
||||
for project in valid_projects:
|
||||
|
@ -127,7 +142,6 @@ class CollectorService(service.Service):
|
|||
# instance. If no, will get a lock and continue processing,
|
||||
# otherwise just skip it.
|
||||
locks = db_api.get_project_locks(project['id'])
|
||||
|
||||
if locks and locks[0].owner != self.identifier:
|
||||
LOG.debug(
|
||||
"Project %s is being processed by collector %s." %
|
||||
|
@ -137,13 +151,31 @@ class CollectorService(service.Service):
|
|||
|
||||
try:
|
||||
with db_api.project_lock(project['id'], self.identifier):
|
||||
processed_count += 1
|
||||
|
||||
# Add a project or get last_collected of existing project.
|
||||
db_project = db_api.project_add(project, last_collect)
|
||||
start = db_project.last_collected
|
||||
|
||||
if self.collector.collect_usage(project, start, end):
|
||||
count = count + 1
|
||||
windows = general.get_windows(start, end)
|
||||
if not windows:
|
||||
LOG.info(
|
||||
"project %s(%s) already up-to-date.",
|
||||
project['id'], project['name']
|
||||
)
|
||||
updated_count += 1
|
||||
continue
|
||||
|
||||
if self.collector.collect_usage(project, windows):
|
||||
success_count += 1
|
||||
except Exception:
|
||||
LOG.warning('Get lock failed. Process: %s' % self.identifier)
|
||||
|
||||
LOG.info("Finished collecting usage for %s projects." % count)
|
||||
LOG.info("Finished collecting usage for %s projects." % success_count)
|
||||
|
||||
# If we start distil-collector manually with 'collect_end_time' param
|
||||
# specified, the service should be stopped automatically after all
|
||||
# projects usage collection is up-to-date.
|
||||
if CONF.collect_end_time and updated_count == processed_count:
|
||||
self.stop()
|
||||
os.kill(os.getpid(), 9)
|
||||
|
|
|
@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase):
|
|||
"""
|
||||
for k, v in kw.items():
|
||||
self.conf.set_override(k, v, group)
|
||||
self.addCleanup(self.conf.clear_override, k, group)
|
||||
|
||||
def _my_dir(self):
|
||||
return os.path.abspath(os.path.dirname(__file__))
|
||||
|
|
|
@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase):
|
|||
srv = collector.CollectorService()
|
||||
ret = srv.collector.collect_usage(
|
||||
{'name': 'fake_project', 'id': '123'},
|
||||
datetime.utcnow() - timedelta(hours=1.5),
|
||||
datetime.utcnow()
|
||||
[(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())]
|
||||
)
|
||||
|
||||
self.assertFalse(ret)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
|
@ -20,6 +21,8 @@ import os
|
|||
import mock
|
||||
|
||||
from distil.collector import base as collector_base
|
||||
from distil.common import constants
|
||||
from distil import config
|
||||
from distil.db.sqlalchemy import api as db_api
|
||||
from distil.service import collector
|
||||
from distil.tests.unit import base
|
||||
|
@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase):
|
|||
]
|
||||
|
||||
collector = collector_base.BaseCollector()
|
||||
collector.collect_usage(project, start_time, end_time)
|
||||
collector.collect_usage(project, [(start_time, end_time)])
|
||||
|
||||
resources = db_api.resource_get_by_ids(project_id, [resource_id_hash])
|
||||
res_info = json.loads(resources[0].info)
|
||||
|
@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase):
|
|||
project_2_collect
|
||||
)
|
||||
|
||||
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collect_usage()
|
||||
|
||||
mock_collect_usage.assert_called_once_with(
|
||||
{'id': '333', 'name': 'project_3', 'description': ''},
|
||||
project_1_collect,
|
||||
end
|
||||
[(project_1_collect, project_1_collect + timedelta(hours=1))]
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_ascending(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_ascending(self, mock_get_projects, mock_cclient):
|
||||
mock_get_projects.return_value = [
|
||||
{'id': '111', 'name': 'project_1', 'description': ''},
|
||||
{'id': '222', 'name': 'project_2', 'description': ''},
|
||||
|
@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase):
|
|||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertEqual(
|
||||
[mock.call('111'), mock.call('222'), mock.call('333'),
|
||||
mock.call('444')],
|
||||
mock_getlocks.call_args_list
|
||||
['111', '222', '333', '444'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_descending(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_descending(self, mock_get_projects, mock_cclient):
|
||||
self.override_config('collector', project_order='descending')
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
|
@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase):
|
|||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertEqual(
|
||||
[mock.call('444'), mock.call('333'), mock.call('222'),
|
||||
mock.call('111')],
|
||||
mock_getlocks.call_args_list
|
||||
['444', '333', '222', '111'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_random(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_random(self, mock_get_projects, mock_cclient):
|
||||
self.override_config('collector', project_order='random')
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
|
@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase):
|
|||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertNotEqual(
|
||||
[mock.call('111'), mock.call('222'), mock.call('333'),
|
||||
mock.call('444')],
|
||||
mock_getlocks.call_args_list
|
||||
['111', '222', '333', '444'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('os.kill')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_collect_with_end_time(self, mock_get_projects, mock_cclient,
|
||||
mock_kill):
|
||||
end_time = datetime.utcnow() + timedelta(hours=0.5)
|
||||
end_time_str = end_time.strftime(constants.iso_time)
|
||||
self.override_config(collect_end_time=end_time_str)
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
{
|
||||
'id': '111',
|
||||
'name': 'project_1',
|
||||
'description': 'description'
|
||||
}
|
||||
]
|
||||
# Insert the project info in the database.
|
||||
db_api.project_add(
|
||||
{
|
||||
'id': '111',
|
||||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime.utcnow()
|
||||
)
|
||||
|
||||
srv = collector.CollectorService()
|
||||
srv.thread_grp = mock.Mock()
|
||||
srv.collect_usage()
|
||||
|
||||
self.assertEqual(1, srv.thread_grp.stop.call_count)
|
||||
self.assertEqual(1, mock_kill.call_count)
|
||||
|
|
Loading…
Reference in New Issue