diff --git a/distil/collector/base.py b/distil/collector/base.py index 0578eb0..ec6f874 100644 --- a/distil/collector/base.py +++ b/distil/collector/base.py @@ -13,6 +13,7 @@ # under the License. import abc +from datetime import timedelta import re import yaml @@ -64,11 +65,16 @@ class BaseCollector(object): resources = {} usage_entries = [] + # NOTE(kong): Set 10min as leadin time when getting samples, this + # helps us to get correct instance uptime, in case the instance + # status in first sample is not in our tracked status list. + actual_start = window_start - timedelta(minutes=10) + try: for mapping in self.meter_mappings: # Invoke get_meter function of specific collector. usage = self.get_meter(project['id'], mapping['meter'], - window_start, window_end) + actual_start, window_end) usage_by_resource = {} self._filter_and_group(usage, usage_by_resource) @@ -125,7 +131,9 @@ class BaseCollector(object): os_distro = image_meta.get('os_distro', 'unknown') else: # Boot from image - image_id = entry['metadata']['image.id'] + image_id = entry['metadata'].get('image.id', None) + image_id = image_id or entry['metadata'].get('image_ref', None) + os_distro = getattr( openstack.get_image(image_id), 'os_distro', @@ -180,6 +188,11 @@ class BaseCollector(object): service, entries, window_start, window_end ) + LOG.debug( + 'After transformation, usage for resource %s: %s' % + (res_id, transformed) + ) + if transformed: res_id = mapping.get('res_id_template', '%s') % res_id res_info = self._get_resource_info( @@ -192,7 +205,6 @@ class BaseCollector(object): res = resources.setdefault(res_id, res_info) res.update(res_info) - LOG.debug('resource info: %s', res) for service, volume in transformed.items(): entry = { @@ -205,4 +217,3 @@ class BaseCollector(object): 'tenant_id': project_id } usage_entries.append(entry) - LOG.debug('new entry: %s', entry) diff --git a/distil/common/general.py b/distil/common/general.py index 8a2061a..d093d1c 100644 --- a/distil/common/general.py +++ b/distil/common/general.py @@ -17,6 +17,7 @@ from datetime import timedelta from decimal import Decimal import functools import math +import socket import warnings import yaml @@ -101,3 +102,8 @@ def convert_to(value, from_unit, to_unit): if from_unit == to_unit: return value return conversions[from_unit][to_unit](value) + + +def get_process_identifier(): + """Gets current running process identifier.""" + return "%s_%s" % (socket.gethostname(), CONF.collector.partitioning_suffix) diff --git a/distil/config.py b/distil/config.py index e3f485c..a01d896 100644 --- a/distil/config.py +++ b/distil/config.py @@ -16,6 +16,7 @@ from keystoneauth1 import loading as ka_loading from oslo_cache import core as cache from oslo_config import cfg from oslo_log import log +from oslo_utils import uuidutils from distil import version @@ -65,6 +66,9 @@ COLLECTOR_OPTS = [ help=('The list of resources that handled by collector.')), cfg.StrOpt('dawn_of_time', default='2014-04-01 00:00:00', help=('The earlist starting time for new tenant.')), + cfg.StrOpt('partitioning_suffix', + help=('Collector partitioning group suffix. It is used when ' + 'running multiple collectors in favor of lock.')) ] ODOO_OPTS = [ diff --git a/distil/db/api.py b/distil/db/api.py index c1170e9..02ef0fa 100644 --- a/distil/db/api.py +++ b/distil/db/api.py @@ -31,6 +31,7 @@ interface. `sqlite:///var/lib/distil/distil.sqlite`. """ +import contextlib from oslo_config import cfg from oslo_db import api as db_api @@ -119,3 +120,27 @@ def project_get(project_id): def project_get_all(): return IMPL.project_get_all() + + +# Project Locks. + +def create_project_lock(project_id, owner): + return IMPL.create_project_lock(project_id, owner) + + +def get_project_locks(project_id): + return IMPL.get_project_locks(project_id) + + +def ensure_project_lock(project_id, owner): + return IMPL.ensure_project_lock(project_id, owner) + + +def delete_project_lock(project_id): + return IMPL.delete_project_lock(project_id) + + +@contextlib.contextmanager +def project_lock(project_id, owner): + with IMPL.project_lock(project_id, owner): + yield diff --git a/distil/db/migration/alembic_migrations/versions/002_add_project_lock_table.py b/distil/db/migration/alembic_migrations/versions/002_add_project_lock_table.py new file mode 100644 index 0000000..40ef64b --- /dev/null +++ b/distil/db/migration/alembic_migrations/versions/002_add_project_lock_table.py @@ -0,0 +1,39 @@ +# Copyright 2017 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add-project-lock-table + +Revision ID: 002 +Revises: 001 +Create Date: 2017-01-11 11:45:05.107888 + +""" + +# revision identifiers, used by Alembic. +revision = '002' +down_revision = '001' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'project_locks', + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('project_id', sa.String(length=100), nullable=False), + sa.Column('owner', sa.String(length=100), nullable=False), + sa.PrimaryKeyConstraint('project_id'), + ) diff --git a/distil/db/sqlalchemy/api.py b/distil/db/sqlalchemy/api.py index b73dbc3..8059a64 100644 --- a/distil/db/sqlalchemy/api.py +++ b/distil/db/sqlalchemy/api.py @@ -15,21 +15,23 @@ """Implementation of SQLAlchemy backend.""" +import contextlib from datetime import datetime import json -import six import sys - -import sqlalchemy as sa -from sqlalchemy import func import threading from oslo_config import cfg from oslo_db import exception as db_exception from oslo_db.sqlalchemy import session as db_session from oslo_log import log as logging +from retrying import retry +import six +import sqlalchemy as sa +from sqlalchemy import func from distil.db.sqlalchemy import models as m +from distil.db.sqlalchemy.models import ProjectLock from distil.db.sqlalchemy.models import Resource from distil.db.sqlalchemy.models import Tenant from distil.db.sqlalchemy.models import UsageEntry @@ -76,7 +78,7 @@ def get_backend(): def setup_db(): try: engine = get_engine() - m.Cluster.metadata.create_all(engine) + m.Tenant.metadata.create_all(engine) except sa.exc.OperationalError as e: LOG.exception("Database registration exception: %s", e) return False @@ -226,8 +228,9 @@ def usages_add(project_id, resources, usage_entries, last_collect): for (id, res_info) in six.iteritems(resources): res_db = _get_resource(session, project_id, id) if res_db: - orig_info = json.loads(res_db.info) - res_db.info = json.dumps(orig_info.update(res_info)) + orig_info = json.loads(res_db.info) or {} + orig_info.update(res_info) + res_db.info = json.dumps(orig_info) else: resource_ref = Resource( id=id, @@ -306,3 +309,82 @@ def _merge_resource_metadata(md_dict, entry, md_def): pass return md_dict + + +def get_project_locks(project_id): + session = get_session() + + query = session.query(ProjectLock) + query = query.filter(ProjectLock.project_id == project_id) + + try: + return query.all() + except Exception as e: + raise exceptions.DBException( + "Failed when querying database, error type: %s, " + "error message: %s" % (e.__class__.__name__, str(e)) + ) + + +@retry(stop_max_attempt_number=3, wait_fixed=5000) +def create_project_lock(project_id, owner): + """Creates project lock record. + + This method has to work without SQLAlchemy session because session may not + immediately issue an SQL query to a database and instead just schedule it + whereas we need to make sure to issue a operation immediately. + + If there are more than 2 transactions trying to get same project lock + (although with little chance), after the first one's commit, only one of + the others will succeed to continue, all others will fail with exception. + Using retry mechanism here to avoid that happen. + """ + session = get_session() + session.flush() + + insert = ProjectLock.__table__.insert() + session.execute(insert.values(project_id=project_id, owner=owner, + created=datetime.utcnow())) + + session.flush() + + +def ensure_project_lock(project_id, owner): + """Make sure project lock record exists.""" + session = get_session() + + query = session.query(ProjectLock) + query = query.filter(ProjectLock.project_id == project_id, + ProjectLock.owner == owner) + + # If there is already lock existing for the process, do not recreate. This + # helps the process continue with the project it was handling before it was + # killed. + if not query.all(): + create_project_lock(project_id, owner) + + +def delete_project_lock(project_id): + """Deletes project lock record. + + This method has to work without SQLAlchemy session because session may not + immediately issue an SQL query to a database and instead just schedule it + whereas we need to make sure to issue a operation immediately. + """ + session = get_session() + session.flush() + + table = ProjectLock.__table__ + delete = table.delete() + session.execute(delete.where(table.c.project_id == project_id)) + + session.flush() + + +@contextlib.contextmanager +def project_lock(project_id, owner): + try: + ensure_project_lock(project_id, owner) + yield + finally: + delete_project_lock(project_id) diff --git a/distil/db/sqlalchemy/models.py b/distil/db/sqlalchemy/models.py index f59932f..b6466af 100644 --- a/distil/db/sqlalchemy/models.py +++ b/distil/db/sqlalchemy/models.py @@ -119,3 +119,11 @@ class SalesOrder(DistilBase): @hybrid_method def intersects(self, other): return (self.start <= other.end and other.start <= self.end) + + +class ProjectLock(DistilBase): + __tablename__ = 'project_locks' + + project_id = Column(String(100), primary_key=True, nullable=False) + owner = Column(String(100), nullable=False) + created = Column(DateTime, nullable=False) diff --git a/distil/service/collector.py b/distil/service/collector.py index 4289cd8..d796a01 100644 --- a/distil/service/collector.py +++ b/distil/service/collector.py @@ -22,6 +22,7 @@ from stevedore import driver from distil.db import api as db_api from distil import exceptions +from distil.common import general from distil.common import openstack LOG = logging.getLogger(__name__) @@ -53,6 +54,8 @@ class CollectorService(service.Service): self.validate_config() + self.identifier = general.get_process_identifier() + collector_args = {} self.collector = driver.DriverManager( 'distil.collector', @@ -93,15 +96,36 @@ class CollectorService(service.Service): logging.setup(CONF, 'distil-collector') def collect_usage(self): - LOG.info("Begin to collect usage for all projects...") + LOG.info("Starting to collect usage...") projects = filter_projects(openstack.get_projects()) end = datetime.utcnow().replace(minute=0, second=0, microsecond=0) + count = 0 for project in projects: - # Add a project or get last_collected of existing project. - db_project = db_api.project_add(project) - start = db_project.last_collected + # Check if the project is being processed by other collector + # instance. If no, will get a lock and continue processing, + # otherwise just skip it. + locks = db_api.get_project_locks(project['id']) - self.collector.collect_usage(project, start, end) + if locks and locks[0].owner != self.identifier: + LOG.debug( + "Project %s is being processed by collector %s." % + (project['id'], locks[0].owner) + ) + continue + + try: + with db_api.project_lock(project['id'], self.identifier): + # Add a project or get last_collected of existing project. + db_project = db_api.project_add(project) + start = db_project.last_collected + + self.collector.collect_usage(project, start, end) + + count = count + 1 + except Exception: + LOG.warning('Get lock failed. Process: %s' % self.identifier) + + LOG.info("Finished collecting usage for %s projects." % count) diff --git a/distil/tests/unit/base.py b/distil/tests/unit/base.py index bfad63b..3186483 100644 --- a/distil/tests/unit/base.py +++ b/distil/tests/unit/base.py @@ -22,6 +22,7 @@ from oslo_log import log from distil import context from distil import config +from distil.db import api as db_api class DistilTestCase(base.BaseTestCase): @@ -34,7 +35,7 @@ class DistilTestCase(base.BaseTestCase): if self.config_file: self.conf = self.load_conf(self.config_file) else: - self.conf = cfg.ConfigOpts() + self.conf = cfg.CONF self.conf.register_opts(config.DEFAULT_OPTIONS) self.conf.register_opts(config.ODOO_OPTS, group=config.ODOO_GROUP) @@ -72,12 +73,12 @@ class DistilTestCase(base.BaseTestCase): :returns: Project's config object. """ - conf = cfg.ConfigOpts() + conf = cfg.CONF log.register_options(conf) conf(args=[], default_config_files=[cls.conf_path(filename)]) return conf - def config(self, group=None, **kw): + def override_config(self, group=None, **kw): """Override some configuration values. The keyword arguments are the names of configuration options to @@ -100,6 +101,6 @@ class DistilWithDbTestCase(DistilTestCase): def setUp(self): super(DistilWithDbTestCase, self).setUp() - self.override_config('connection', "sqlite://", group='database') + self.conf.set_default('connection', 'sqlite://', group='database') db_api.setup_db() self.addCleanup(db_api.drop_db) diff --git a/distil/tests/unit/db/__init__.py b/distil/tests/unit/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/distil/tests/unit/db/test_sqlalchemy_db_api.py b/distil/tests/unit/db/test_sqlalchemy_db_api.py new file mode 100644 index 0000000..7a74eae --- /dev/null +++ b/distil/tests/unit/db/test_sqlalchemy_db_api.py @@ -0,0 +1,29 @@ +# Copyright (C) 2017 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from distil.db.sqlalchemy import api as db_api +from distil.tests.unit import base + + +class ProjectLockTest(base.DistilWithDbTestCase): + def test_with_project_lock(self): + project_id = 'fake_project_id' + owner = 'fake_owner' + + with db_api.project_lock(project_id, owner): + # Make sure that within 'with' section the lock record exists. + self.assertEqual(1, len(db_api.get_project_locks(project_id))) + + # Make sure that outside 'with' section the lock record does not exist. + self.assertEqual(0, len(db_api.get_project_locks(project_id))) diff --git a/requirements.txt b/requirements.txt index d17e939..e1e40ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ odoorpc==0.4.2 SQLAlchemy<1.1.0,>=1.0.10 # MIT keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0 keystoneauth1>=2.1.0 # Apache-2.0 +retrying>=1.2.3,!=1.3.0 # Apache-2.0 python-cinderclient>=1.6.0 # Apache-2.0 python-keystoneclient!=1.8.0,!=2.1.0,>=1.6.0 # Apache-2.0 diff --git a/tox.ini b/tox.ini index 30a2741..28df0d5 100644 --- a/tox.ini +++ b/tox.ini @@ -11,10 +11,14 @@ setenv = DISTIL_TESTS_CONFIGS_DIR={toxinidir}/distil/tests/etc/ DISCOVER_DIRECTORY=distil/tests/unit deps = - -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt -commands = python setup.py testr --slowest --testr-args="{posargs}" -whitelist_externals = bash +commands = + find . -type f -name "*.pyc" -delete + python setup.py testr --slowest --testr-args="{posargs}" +whitelist_externals = + bash + rm + find [testenv:py33] deps = -r{toxinidir}/requirements-py3.txt