From de1ab6d96b2e6a7e3e1fa02a245a795a9a44a538 Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Thu, 28 Dec 2023 11:17:56 -0800 Subject: [PATCH] Fixed sqlalchemy 2.x support Re-arranged sessions to always be at the top level to make it easier to know the origin of the current session. This is important now that we no longer have autocommit enabled. - Added zuul job testing sqlalchemy 2.x. - Added new db api for service cleanup. - Removed broken sqlite cleanup step during testing. Change-Id: I168f3d9518611ac66cb9eec1132a7add19e92d5f --- .zuul.yaml | 36 +- devstack/lib/senlin | 5 +- ...d-for-SQLAlchemy-2.x-ee6831e5a95d3658.yaml | 4 + senlin/cmd/status.py | 13 +- senlin/common/service.py | 24 +- senlin/db/api.py | 4 +- senlin/db/sqlalchemy/api.py | 751 ++++++++++-------- senlin/objects/service.py | 5 +- senlin/tests/unit/common/base.py | 7 +- senlin/tests/unit/conductor/test_service.py | 31 +- senlin/tests/unit/db/test_service_api.py | 45 +- 11 files changed, 503 insertions(+), 422 deletions(-) create mode 100644 releasenotes/notes/Updated-for-SQLAlchemy-2.x-ee6831e5a95d3658.yaml diff --git a/.zuul.yaml b/.zuul.yaml index dc01f2507..458a626d8 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -8,8 +8,10 @@ check: jobs: - senlin-dsvm-tempest-py3-api + - senlin-dsvm-tempest-py3-api-sqlalchemy-2x - senlin-tempest-api-ipv6-only - senlin-dsvm-tempest-py3-functional + - senlin-dsvm-tempest-py3-functional-sqlalchemy-2x - senlin-dsvm-tempest-py3-integration - senlin-dsvm-tempest-py3-integration-zaqar: voting: false @@ -65,6 +67,22 @@ DEFAULT: cloud_backend: openstack_test +- job: + name: senlin-dsvm-tempest-py3-api-sqlalchemy-2x + parent: senlin-tempest-base + required-projects: + - name: openstack/oslo.db + vars: + tempest_test_regex: senlin_tempest_plugin.tests.api + devstack_localrc: + USE_PYTHON3: true + USE_SQLALCHEMY_LATEST: true + devstack_local_conf: + post-config: + $SENLIN_CONF: + DEFAULT: + cloud_backend: openstack_test + - job: name: senlin-dsvm-tempest-py3-functional parent: senlin-tempest-base @@ -79,6 +97,23 @@ cloud_backend: openstack_test health_check_interval_min: 10 +- job: + name: senlin-dsvm-tempest-py3-functional-sqlalchemy-2x + parent: senlin-tempest-base + required-projects: + - name: openstack/oslo.db + vars: + tempest_test_regex: senlin_tempest_plugin.tests.functional + devstack_localrc: + USE_PYTHON3: true + USE_SQLALCHEMY_LATEST: true + devstack_local_conf: + post-config: + $SENLIN_CONF: + DEFAULT: + cloud_backend: openstack_test + health_check_interval_min: 10 + - job: name: senlin-dsvm-tempest-py3-integration parent: senlin-tempest-base @@ -143,4 +178,3 @@ $SENLIN_CONF: DEFAULT: cloud_backend: openstack_test - diff --git a/devstack/lib/senlin b/devstack/lib/senlin index c6211d6e8..97ccfbd6e 100644 --- a/devstack/lib/senlin +++ b/devstack/lib/senlin @@ -196,10 +196,13 @@ function _config_senlin_apache_wsgi { # init_senlin() - Initialize database function init_senlin { - # (re)create senlin database recreate_database senlin utf8 + if [[ "$USE_SQLALCHEMY_LATEST" == "True" ]]; then + pip3 install --upgrade alembic sqlalchemy + fi + $SENLIN_BIN_DIR/senlin-manage db_sync create_senlin_cache_dir } diff --git a/releasenotes/notes/Updated-for-SQLAlchemy-2.x-ee6831e5a95d3658.yaml b/releasenotes/notes/Updated-for-SQLAlchemy-2.x-ee6831e5a95d3658.yaml new file mode 100644 index 000000000..82d8b96f9 --- /dev/null +++ b/releasenotes/notes/Updated-for-SQLAlchemy-2.x-ee6831e5a95d3658.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Fixed compatibility issues with SQLAlchemy 2.x. diff --git a/senlin/cmd/status.py b/senlin/cmd/status.py index 1fd9097fd..2d3bde105 100644 --- a/senlin/cmd/status.py +++ b/senlin/cmd/status.py @@ -20,6 +20,7 @@ from oslo_upgradecheck import upgradecheck from senlin.common.i18n import _ from senlin.db import api +from senlin.db.sqlalchemy import api as sql_api from sqlalchemy import MetaData, Table, select, column @@ -42,15 +43,19 @@ class Checks(upgradecheck.UpgradeCommands): """ engine = api.get_engine() - metadata = MetaData(bind=engine) - policy = Table('policy', metadata, autoload=True) + metadata = MetaData() + metadata.bind = engine + + policy = Table('policy', metadata, autoload_with=engine) healthpolicy_select = ( - select([column('name')]) + select(column('name')) .select_from(policy) .where(column('type') == 'senlin.policy.health-1.0') ) - healthpolicy_rows = engine.execute(healthpolicy_select).fetchall() + + with sql_api.session_for_read() as session: + healthpolicy_rows = session.execute(healthpolicy_select).fetchall() if not healthpolicy_rows: return upgradecheck.Result(upgradecheck.Code.SUCCESS) diff --git a/senlin/common/service.py b/senlin/common/service.py index f627d893c..29b635a7f 100644 --- a/senlin/common/service.py +++ b/senlin/common/service.py @@ -90,29 +90,7 @@ class Service(service.Service): def service_manage_cleanup(self): self.cleanup_count += 1 try: - ctx = senlin_context.get_admin_context() - services = service_obj.Service.get_all_expired( - ctx, self.name - ) - for svc in services: - LOG.info( - 'Breaking locks for dead service %(name)s ' - '(id: %(service_id)s)', - { - 'name': self.name, - 'service_id': svc['id'], - } - ) - service_obj.Service.gc_by_engine(svc['id']) - LOG.info( - 'Done breaking locks for service %(name)s ' - '(id: %(service_id)s)', - { - 'name': self.name, - 'service_id': svc['id'], - } - ) - service_obj.Service.delete(svc['id']) + service_obj.Service.cleanup_all_expired(self.name) except Exception as ex: LOG.error( 'Error while cleaning up service %(name)s: %(ex)s', diff --git a/senlin/db/api.py b/senlin/db/api.py index 74fca573c..0cec1e244 100755 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -486,8 +486,8 @@ def service_get_all(): return IMPL.service_get_all() -def service_get_all_expired(binary): - return IMPL.service_get_all_expired(binary) +def service_cleanup_all_expired(binary): + return IMPL.service_cleanup_all_expired(binary) def gc_by_engine(engine_id): diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index e02902c9d..028bb7010 100755 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -75,7 +75,6 @@ def _get_main_context_manager(): if not _MAIN_CONTEXT_MANAGER: initialize() _MAIN_CONTEXT_MANAGER = enginefacade.transaction_context() - _MAIN_CONTEXT_MANAGER.configure(__autocommit=True) return _MAIN_CONTEXT_MANAGER @@ -129,9 +128,9 @@ def retry_on_deadlock(f): max_retry_interval=CONF.database_max_retry_interval)(f) -def query_by_short_id(context, model_query, model, short_id, +def query_by_short_id(session, context, model_query, model, short_id, project_safe=True): - q = model_query() + q = model_query(session) q = q.filter(model.id.like('%s%%' % short_id)) q = utils.filter_query_by_project(q, project_safe, context) @@ -143,8 +142,8 @@ def query_by_short_id(context, model_query, model, short_id, raise exception.MultipleChoices(arg=short_id) -def query_by_name(context, model_query, name, project_safe=True): - q = model_query() +def query_by_name(session, context, model_query, name, project_safe=True): + q = model_query(session) q = q.filter_by(name=name) q = utils.filter_query_by_project(q, project_safe, context) @@ -157,14 +156,13 @@ def query_by_name(context, model_query, name, project_safe=True): # Clusters -def cluster_model_query(): - with session_for_read() as session: - query = session.query(models.Cluster).options( - joinedload(models.Cluster.nodes), - joinedload(models.Cluster.profile), - joinedload(models.Cluster.policies) - ) - return query +def cluster_model_query(session): + query = session.query(models.Cluster).options( + joinedload(models.Cluster.nodes), + joinedload(models.Cluster.profile), + joinedload(models.Cluster.policies) + ) + return query @retry_on_deadlock @@ -177,26 +175,30 @@ def cluster_create(context, values): def cluster_get(context, cluster_id, project_safe=True): - cluster = cluster_model_query().get(cluster_id) + with session_for_read() as session: + cluster = cluster_model_query(session).get(cluster_id) - if cluster is None: - return None + if cluster is None: + return None - return utils.check_resource_project(context, cluster, project_safe) + return utils.check_resource_project(context, cluster, project_safe) def cluster_get_by_name(context, name, project_safe=True): - return query_by_name(context, cluster_model_query, name, - project_safe=project_safe) + with session_for_read() as session: + return query_by_name(session, context, cluster_model_query, name, + project_safe=project_safe) def cluster_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, cluster_model_query, models.Cluster, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, cluster_model_query, + models.Cluster, + short_id, project_safe=project_safe) -def _query_cluster_get_all(context, project_safe=True): - query = cluster_model_query() +def _query_cluster_get_all(session, context, project_safe=True): + query = cluster_model_query(session) query = utils.filter_query_by_project(query, project_safe, context) return query @@ -204,16 +206,18 @@ def _query_cluster_get_all(context, project_safe=True): def cluster_get_all(context, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = _query_cluster_get_all(context, project_safe=project_safe) - if filters: - query = utils.exact_filter(query, models.Cluster, filters) + with session_for_read() as session: + query = _query_cluster_get_all(session, context, + project_safe=project_safe) + if filters: + query = utils.exact_filter(query, models.Cluster, filters) - keys, dirs = utils.get_sort_params(sort, consts.CLUSTER_INIT_AT) - if marker: - marker = cluster_model_query().get(marker) + keys, dirs = utils.get_sort_params(sort, consts.CLUSTER_INIT_AT) + if marker: + marker = cluster_model_query(session).get(marker) - return sa_utils.paginate_query(query, models.Cluster, limit, keys, - marker=marker, sort_dirs=dirs).all() + return sa_utils.paginate_query(query, models.Cluster, limit, keys, + marker=marker, sort_dirs=dirs).all() @retry_on_deadlock @@ -231,9 +235,11 @@ def cluster_next_index(context, cluster_id): def cluster_count_all(context, filters=None, project_safe=True): - query = _query_cluster_get_all(context, project_safe=project_safe) - query = utils.exact_filter(query, models.Cluster, filters) - return query.count() + with session_for_read() as session: + query = _query_cluster_get_all(session, context, + project_safe=project_safe) + query = utils.exact_filter(query, models.Cluster, filters) + return query.count() @retry_on_deadlock @@ -272,12 +278,11 @@ def cluster_delete(context, cluster_id): # Nodes -def node_model_query(): - with session_for_read() as session: - query = session.query(models.Node).options( - joinedload(models.Node.profile) - ) - return query +def node_model_query(session): + query = session.query(models.Node).options( + joinedload(models.Node.profile) + ) + return query @retry_on_deadlock @@ -291,25 +296,29 @@ def node_create(context, values): def node_get(context, node_id, project_safe=True): - node = node_model_query().get(node_id) - if not node: - return None + with session_for_read() as session: + node = node_model_query(session).get(node_id) + if not node: + return None - return utils.check_resource_project(context, node, project_safe) + return utils.check_resource_project(context, node, project_safe) def node_get_by_name(context, name, project_safe=True): - return query_by_name(context, node_model_query, name, - project_safe=project_safe) - - -def node_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, node_model_query, models.Node, short_id, + with session_for_read() as session: + return query_by_name(session, context, node_model_query, name, project_safe=project_safe) -def _query_node_get_all(context, project_safe=True, cluster_id=None): - query = node_model_query() +def node_get_by_short_id(context, short_id, project_safe=True): + with session_for_read() as session: + return query_by_short_id(session, context, node_model_query, + models.Node, short_id, + project_safe=project_safe) + + +def _query_node_get_all(session, context, project_safe=True, cluster_id=None): + query = node_model_query(session) if cluster_id is not None: query = query.filter_by(cluster_id=cluster_id) @@ -321,28 +330,30 @@ def _query_node_get_all(context, project_safe=True, cluster_id=None): def node_get_all(context, cluster_id=None, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = _query_node_get_all(context, project_safe=project_safe, - cluster_id=cluster_id) + with session_for_read() as session: + query = _query_node_get_all(session, context, + project_safe=project_safe, + cluster_id=cluster_id) - if filters: - query = utils.exact_filter(query, models.Node, filters) + if filters: + query = utils.exact_filter(query, models.Node, filters) - keys, dirs = utils.get_sort_params(sort, consts.NODE_INIT_AT) - if marker: - marker = node_model_query().get(marker) - return sa_utils.paginate_query(query, models.Node, limit, keys, - marker=marker, sort_dirs=dirs).all() + keys, dirs = utils.get_sort_params(sort, consts.NODE_INIT_AT) + if marker: + marker = node_model_query(session).get(marker) + return sa_utils.paginate_query(query, models.Node, limit, keys, + marker=marker, sort_dirs=dirs).all() def node_get_all_by_cluster(context, cluster_id, filters=None, project_safe=True): + with session_for_read() as session: + query = _query_node_get_all(session, context, cluster_id=cluster_id, + project_safe=project_safe) + if filters: + query = utils.exact_filter(query, models.Node, filters) - query = _query_node_get_all(context, cluster_id=cluster_id, - project_safe=project_safe) - if filters: - query = utils.exact_filter(query, models.Node, filters) - - return query.all() + return query.all() def node_ids_by_cluster(context, cluster_id, filters=None): @@ -356,13 +367,14 @@ def node_ids_by_cluster(context, cluster_id, filters=None): def node_count_by_cluster(context, cluster_id, **kwargs): - project_safe = kwargs.pop('project_safe', True) - query = node_model_query() - query = query.filter_by(cluster_id=cluster_id) - query = query.filter_by(**kwargs) - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + project_safe = kwargs.pop('project_safe', True) + query = node_model_query(session) + query = query.filter_by(cluster_id=cluster_id) + query = query.filter_by(**kwargs) + query = utils.filter_query_by_project(query, project_safe, context) - return query.count() + return query.count() @retry_on_deadlock @@ -617,12 +629,11 @@ def node_lock_steal(node_id, action_id): # Policies -def policy_model_query(): - with session_for_read() as session: - query = session.query(models.Policy).options( - joinedload(models.Policy.bindings) - ) - return query +def policy_model_query(session): + query = session.query(models.Policy).options( + joinedload(models.Policy.bindings) + ) + return query @retry_on_deadlock @@ -635,38 +646,43 @@ def policy_create(context, values): def policy_get(context, policy_id, project_safe=True): - policy = policy_model_query() - policy = policy.filter_by(id=policy_id).first() + with session_for_read() as session: + policy = policy_model_query(session) + policy = policy.filter_by(id=policy_id).first() - if policy is None: - return None + if policy is None: + return None - return utils.check_resource_project(context, policy, project_safe) + return utils.check_resource_project(context, policy, project_safe) def policy_get_by_name(context, name, project_safe=True): - return query_by_name(context, policy_model_query, name, - project_safe=project_safe) + with session_for_read() as session: + return query_by_name(session, context, policy_model_query, name, + project_safe=project_safe) def policy_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, policy_model_query, models.Policy, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, policy_model_query, + models.Policy, + short_id, project_safe=project_safe) def policy_get_all(context, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = policy_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = policy_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - if filters: - query = utils.exact_filter(query, models.Policy, filters) + if filters: + query = utils.exact_filter(query, models.Policy, filters) - keys, dirs = utils.get_sort_params(sort, consts.POLICY_CREATED_AT) - if marker: - marker = policy_model_query().get(marker) - return sa_utils.paginate_query(query, models.Policy, limit, keys, - marker=marker, sort_dirs=dirs).all() + keys, dirs = utils.get_sort_params(sort, consts.POLICY_CREATED_AT) + if marker: + marker = policy_model_query(session).get(marker) + return sa_utils.paginate_query(query, models.Policy, limit, keys, + marker=marker, sort_dirs=dirs).all() @retry_on_deadlock @@ -697,17 +713,17 @@ def policy_delete(context, policy_id): # Cluster-Policy Associations -def cluster_policy_model_query(): - with session_for_read() as session: - query = session.query(models.ClusterPolicies) - return query +def cluster_policy_model_query(session): + query = session.query(models.ClusterPolicies) + return query def cluster_policy_get(context, cluster_id, policy_id): - query = cluster_policy_model_query() - bindings = query.filter_by(cluster_id=cluster_id, - policy_id=policy_id) - return bindings.first() + with session_for_read() as session: + query = cluster_policy_model_query(session) + bindings = query.filter_by(cluster_id=cluster_id, + policy_id=policy_id) + return bindings.first() def cluster_policy_get_all(context, cluster_id, filters=None, sort=None): @@ -735,8 +751,8 @@ def cluster_policy_get_all(context, cluster_id, filters=None, sort=None): models.Policy.name == filters[key_name]) keys, dirs = utils.get_sort_params(sort) - return sa_utils.paginate_query(query, models.ClusterPolicies, None, - keys, sort_dirs=dirs).all() + return sa_utils.paginate_query(query, models.ClusterPolicies, None, + keys, sort_dirs=dirs).all() def cluster_policy_ids_by_cluster(context, cluster_id): @@ -748,35 +764,37 @@ def cluster_policy_ids_by_cluster(context, cluster_id): def cluster_policy_get_by_type(context, cluster_id, policy_type, filters=None): + with session_for_read() as session: + query = cluster_policy_model_query(session) + query = query.filter_by(cluster_id=cluster_id) - query = cluster_policy_model_query() - query = query.filter_by(cluster_id=cluster_id) + key_enabled = consts.CP_ENABLED + if filters and key_enabled in filters: + filter_enabled = {key_enabled: filters[key_enabled]} + query = utils.exact_filter(query, models.ClusterPolicies, + filter_enabled) - key_enabled = consts.CP_ENABLED - if filters and key_enabled in filters: - filter_enabled = {key_enabled: filters[key_enabled]} - query = utils.exact_filter(query, models.ClusterPolicies, - filter_enabled) + query = query.join(models.Policy).filter( + models.Policy.type == policy_type) - query = query.join(models.Policy).filter(models.Policy.type == policy_type) - - return query.all() + return query.all() def cluster_policy_get_by_name(context, cluster_id, policy_name, filters=None): + with session_for_read() as session: + query = cluster_policy_model_query(session) + query = query.filter_by(cluster_id=cluster_id) - query = cluster_policy_model_query() - query = query.filter_by(cluster_id=cluster_id) + key_enabled = consts.CP_ENABLED + if filters and key_enabled in filters: + filter_enabled = {key_enabled: filters[key_enabled]} + query = utils.exact_filter(query, models.ClusterPolicies, + filter_enabled) - key_enabled = consts.CP_ENABLED - if filters and key_enabled in filters: - filter_enabled = {key_enabled: filters[key_enabled]} - query = utils.exact_filter(query, models.ClusterPolicies, - filter_enabled) + query = query.join(models.Policy).filter( + models.Policy.name == policy_name) - query = query.join(models.Policy).filter(models.Policy.name == policy_name) - - return query.all() + return query.all() @retry_on_deadlock @@ -862,10 +880,9 @@ def cluster_remove_dependents(context, cluster_id, profile_id): # Profiles -def profile_model_query(): - with session_for_read() as session: - query = session.query(models.Profile) - return query +def profile_model_query(session): + query = session.query(models.Profile) + return query @retry_on_deadlock @@ -878,38 +895,43 @@ def profile_create(context, values): def profile_get(context, profile_id, project_safe=True): - query = profile_model_query() - profile = query.get(profile_id) + with session_for_read() as session: + query = profile_model_query(session) + profile = query.get(profile_id) - if profile is None: - return None + if profile is None: + return None - return utils.check_resource_project(context, profile, project_safe) + return utils.check_resource_project(context, profile, project_safe) def profile_get_by_name(context, name, project_safe=True): - return query_by_name(context, profile_model_query, name, - project_safe=project_safe) + with session_for_read() as session: + return query_by_name(session, context, profile_model_query, name, + project_safe=project_safe) def profile_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, profile_model_query, models.Profile, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, profile_model_query, + models.Profile, + short_id, project_safe=project_safe) def profile_get_all(context, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = profile_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = profile_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - if filters: - query = utils.exact_filter(query, models.Profile, filters) + if filters: + query = utils.exact_filter(query, models.Profile, filters) - keys, dirs = utils.get_sort_params(sort, consts.PROFILE_CREATED_AT) - if marker: - marker = profile_model_query().get(marker) - return sa_utils.paginate_query(query, models.Profile, limit, keys, - marker=marker, sort_dirs=dirs).all() + keys, dirs = utils.get_sort_params(sort, consts.PROFILE_CREATED_AT) + if marker: + marker = profile_model_query(session).get(marker) + return sa_utils.paginate_query(query, models.Profile, limit, keys, + marker=marker, sort_dirs=dirs).all() @retry_on_deadlock @@ -945,10 +967,9 @@ def profile_delete(context, profile_id): # Credentials -def credential_model_query(): - with session_for_read() as session: - query = session.query(models.Credential) - return query +def credential_model_query(session): + query = session.query(models.Credential) + return query @retry_on_deadlock @@ -961,7 +982,8 @@ def cred_create(context, values): def cred_get(context, user, project): - return credential_model_query().get((user, project)) + with session_for_read() as session: + return credential_model_query(session).get((user, project)) @retry_on_deadlock @@ -993,12 +1015,11 @@ def cred_create_update(context, values): # Events -def event_model_query(): - with session_for_read() as session: - query = session.query(models.Event).options( - joinedload(models.Event.cluster) - ) - return query +def event_model_query(session): + query = session.query(models.Event).options( + joinedload(models.Event.cluster) + ) + return query @retry_on_deadlock @@ -1012,53 +1033,63 @@ def event_create(context, values): @retry_on_deadlock def event_get(context, event_id, project_safe=True): - event = event_model_query().get(event_id) - return utils.check_resource_project(context, event, project_safe) + with session_for_read() as session: + event = event_model_query(session).get(event_id) + return utils.check_resource_project(context, event, project_safe) def event_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, event_model_query, models.Event, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, event_model_query, + models.Event, + short_id, project_safe=project_safe) -def _event_filter_paginate_query(context, query, filters=None, +def _event_filter_paginate_query(session, context, query, filters=None, limit=None, marker=None, sort=None): if filters: query = utils.exact_filter(query, models.Event, filters) keys, dirs = utils.get_sort_params(sort, consts.EVENT_TIMESTAMP) if marker: - marker = event_model_query().get(marker) + marker = event_model_query(session).get(marker) return sa_utils.paginate_query(query, models.Event, limit, keys, marker=marker, sort_dirs=dirs).all() def event_get_all(context, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = event_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = event_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - return _event_filter_paginate_query(context, query, filters=filters, - limit=limit, marker=marker, sort=sort) + return _event_filter_paginate_query(session, context, query, + filters=filters, + limit=limit, marker=marker, + sort=sort) def event_count_by_cluster(context, cluster_id, project_safe=True): - query = event_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = event_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - count = query.filter_by(cluster_id=cluster_id).count() + count = query.filter_by(cluster_id=cluster_id).count() - return count + return count def event_get_all_by_cluster(context, cluster_id, limit=None, marker=None, sort=None, filters=None, project_safe=True): - query = event_model_query() - query = query.filter_by(cluster_id=cluster_id) - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = event_model_query(session) + query = query.filter_by(cluster_id=cluster_id) + query = utils.filter_query_by_project(query, project_safe, context) - return _event_filter_paginate_query(context, query, filters=filters, - limit=limit, marker=marker, sort=sort) + return _event_filter_paginate_query(session, context, query, + filters=filters, + limit=limit, marker=marker, + sort=sort) @retry_on_deadlock @@ -1091,13 +1122,12 @@ def event_purge(project, granularity='days', age=30): # Actions -def action_model_query(): - with session_for_read() as session: - query = session.query(models.Action).options( - joinedload(models.Action.dep_on), - joinedload(models.Action.dep_by) - ) - return query +def action_model_query(session): + query = session.query(models.Action).options( + joinedload(models.Action.dep_on), + joinedload(models.Action.dep_by) + ) + return query @retry_on_deadlock @@ -1106,6 +1136,7 @@ def action_create(context, values): action = models.Action() action.update(values) session.add(action) + return action_get(context, action.id) @@ -1121,73 +1152,81 @@ def action_update(context, action_id, values): def action_get(context, action_id, project_safe=True, refresh=False): - action = action_model_query().get(action_id) - if action is None: - return None + with session_for_read() as session: + action = action_model_query(session).get(action_id) + if action is None: + return None - return utils.check_resource_project(context, action, project_safe) + return utils.check_resource_project(context, action, project_safe) def action_list_active_scaling(context, cluster_id=None, project_safe=True): - query = action_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = action_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - if cluster_id: - query = query.filter_by(target=cluster_id) - query = query.filter( - models.Action.status.in_( - [consts.ACTION_READY, - consts.ACTION_WAITING, - consts.ACTION_RUNNING, - consts.ACTION_WAITING_LIFECYCLE_COMPLETION])) - query = query.filter( - models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS)) - scaling_actions = query.all() - return scaling_actions + if cluster_id: + query = query.filter_by(target=cluster_id) + query = query.filter( + models.Action.status.in_( + [consts.ACTION_READY, + consts.ACTION_WAITING, + consts.ACTION_RUNNING, + consts.ACTION_WAITING_LIFECYCLE_COMPLETION])) + query = query.filter( + models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS)) + scaling_actions = query.all() + return scaling_actions def action_get_by_name(context, name, project_safe=True): - return query_by_name(context, action_model_query, name, - project_safe=project_safe) + with session_for_read() as session: + return query_by_name(session, context, action_model_query, name, + project_safe=project_safe) def action_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, action_model_query, models.Action, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, action_model_query, + models.Action, + short_id, project_safe=project_safe) def action_get_all_by_owner(context, owner_id): - query = action_model_query().filter_by(owner=owner_id) - return query.all() + with session_for_read() as session: + query = action_model_query(session).filter_by(owner=owner_id) + return query.all() def action_get_all_active_by_target(context, target_id, project_safe=True): - query = action_model_query() - query = utils.filter_query_by_project(query, project_safe, context) - query = query.filter_by(target=target_id) - query = query.filter( - models.Action.status.in_( - [consts.ACTION_READY, - consts.ACTION_WAITING, - consts.ACTION_RUNNING, - consts.ACTION_WAITING_LIFECYCLE_COMPLETION])) - actions = query.all() - return actions + with session_for_read() as session: + query = action_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) + query = query.filter_by(target=target_id) + query = query.filter( + models.Action.status.in_( + [consts.ACTION_READY, + consts.ACTION_WAITING, + consts.ACTION_RUNNING, + consts.ACTION_WAITING_LIFECYCLE_COMPLETION])) + actions = query.all() + return actions def action_get_all(context, filters=None, limit=None, marker=None, sort=None, project_safe=True): - query = action_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = action_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - if filters: - query = utils.exact_filter(query, models.Action, filters) + if filters: + query = utils.exact_filter(query, models.Action, filters) - keys, dirs = utils.get_sort_params(sort, consts.ACTION_CREATED_AT) - if marker: - marker = action_model_query().get(marker) - return sa_utils.paginate_query(query, models.Action, limit, keys, - marker=marker, sort_dirs=dirs).all() + keys, dirs = utils.get_sort_params(sort, consts.ACTION_CREATED_AT) + if marker: + marker = action_model_query(session).get(marker) + return sa_utils.paginate_query(query, models.Action, limit, keys, + marker=marker, sort_dirs=dirs).all() @retry_on_deadlock @@ -1208,34 +1247,36 @@ def action_check_status(context, action_id, timestamp): return action.status -def action_dependency_model_query(): - with session_for_read() as session: - query = session.query(models.ActionDependency) - return query +def action_dependency_model_query(session): + query = session.query(models.ActionDependency) + return query @retry_on_deadlock def dependency_get_depended(context, action_id): - q = action_dependency_model_query().filter_by( - dependent=action_id) - return [d.depended for d in q.all()] + with session_for_read() as session: + q = action_dependency_model_query(session).filter_by( + dependent=action_id) + return [d.depended for d in q.all()] @retry_on_deadlock def dependency_get_dependents(context, action_id): - q = action_dependency_model_query().filter_by( - depended=action_id) - return [d.dependent for d in q.all()] + with session_for_read() as session: + q = action_dependency_model_query(session).filter_by( + depended=action_id) + return [d.dependent for d in q.all()] @retry_on_deadlock def dependency_add(context, depended, dependent): if isinstance(depended, list) and isinstance(dependent, list): raise exception.Error( - 'Multiple dependencies between lists not support') + 'Multiple dependencies between lists not support' + ) with session_for_write() as session: - if isinstance(depended, list): # e.g. D depends on A,B,C + if isinstance(depended, list): # e.g. D depends on A,B,C for d in depended: r = models.ActionDependency(depended=d, dependent=dependent) session.add(r) @@ -1268,7 +1309,6 @@ def dependency_add(context, depended, dependent): @retry_on_deadlock def action_mark_succeeded(context, action_id, timestamp): with session_for_write() as session: - query = session.query(models.Action).filter_by(id=action_id) values = { 'owner': None, @@ -1286,7 +1326,6 @@ def action_mark_succeeded(context, action_id, timestamp): @retry_on_deadlock def action_mark_ready(context, action_id, timestamp): with session_for_write() as session: - query = session.query(models.Action).filter_by(id=action_id) values = { 'owner': None, @@ -1298,33 +1337,33 @@ def action_mark_ready(context, action_id, timestamp): @retry_on_deadlock -def _mark_failed(action_id, timestamp, reason=None): +def _mark_failed(session, action_id, timestamp, reason=None): # mark myself as failed - with session_for_write() as session: - query = session.query(models.Action).filter_by(id=action_id) - values = { - 'owner': None, - 'status': consts.ACTION_FAILED, - 'status_reason': (str(reason) if reason else - 'Action execution failed'), - 'end_time': timestamp, - } - query.update(values, synchronize_session=False) - action = query.all() + query = session.query(models.Action).filter_by(id=action_id) + values = { + 'owner': None, + 'status': consts.ACTION_FAILED, + 'status_reason': (str(reason) if reason else + 'Action execution failed'), + 'end_time': timestamp, + } + query.update(values, synchronize_session=False) + action = query.all() - query = session.query(models.ActionDependency) - query = query.filter_by(depended=action_id) - dependents = [d.dependent for d in query.all()] - query.delete(synchronize_session=False) + query = session.query(models.ActionDependency) + query = query.filter_by(depended=action_id) + dependents = [d.dependent for d in query.all()] + query.delete(synchronize_session=False) if parent_status_update_needed(action): for d in dependents: - _mark_failed(d, timestamp) + _mark_failed(session, d, timestamp) @retry_on_deadlock def action_mark_failed(context, action_id, timestamp, reason=None): - _mark_failed(action_id, timestamp, reason) + with session_for_write() as session: + _mark_failed(session, action_id, timestamp, reason) @retry_on_deadlock @@ -1429,14 +1468,15 @@ def action_abandon(context, action_id, values=None): @retry_on_deadlock def action_lock_check(context, action_id, owner=None): - action = action_model_query().get(action_id) - if not action: - raise exception.ResourceNotFound(type='action', id=action_id) + with session_for_read() as session: + action = action_model_query(session).get(action_id) + if not action: + raise exception.ResourceNotFound(type='action', id=action_id) - if owner: - return owner if owner == action.owner else action.owner - else: - return action.owner if action.owner else None + if owner: + return owner if owner == action.owner else action.owner + else: + return action.owner if action.owner else None @retry_on_deadlock @@ -1451,11 +1491,12 @@ def action_signal(context, action_id, value): def action_signal_query(context, action_id): - action = action_model_query().get(action_id) - if not action: - return None + with session_for_read() as session: + action = action_model_query(session).get(action_id) + if not action: + return None - return action.control + return action.control @retry_on_deadlock @@ -1467,7 +1508,6 @@ def action_delete(context, action_id): if ((action.status == consts.ACTION_WAITING) or (action.status == consts.ACTION_RUNNING) or (action.status == consts.ACTION_SUSPENDED)): - raise exception.EResourceBusy(type='action', id=action_id) session.delete(action) @@ -1517,10 +1557,9 @@ def action_purge(project, granularity='days', age=30): # Receivers -def receiver_model_query(): - with session_for_read() as session: - query = session.query(models.Receiver) - return query +def receiver_model_query(session): + query = session.query(models.Receiver) + return query @retry_on_deadlock @@ -1533,36 +1572,41 @@ def receiver_create(context, values): def receiver_get(context, receiver_id, project_safe=True): - receiver = receiver_model_query().get(receiver_id) - if not receiver: - return None + with session_for_read() as session: + receiver = receiver_model_query(session).get(receiver_id) + if not receiver: + return None - return utils.check_resource_project(context, receiver, project_safe) + return utils.check_resource_project(context, receiver, project_safe) def receiver_get_all(context, limit=None, marker=None, filters=None, sort=None, project_safe=True): - query = receiver_model_query() - query = utils.filter_query_by_project(query, project_safe, context) + with session_for_read() as session: + query = receiver_model_query(session) + query = utils.filter_query_by_project(query, project_safe, context) - if filters: - query = utils.exact_filter(query, models.Receiver, filters) + if filters: + query = utils.exact_filter(query, models.Receiver, filters) - keys, dirs = utils.get_sort_params(sort, consts.RECEIVER_NAME) - if marker: - marker = receiver_model_query().get(marker) - return sa_utils.paginate_query(query, models.Receiver, limit, keys, - marker=marker, sort_dirs=dirs).all() + keys, dirs = utils.get_sort_params(sort, consts.RECEIVER_NAME) + if marker: + marker = receiver_model_query(session).get(marker) + return sa_utils.paginate_query(query, models.Receiver, limit, keys, + marker=marker, sort_dirs=dirs).all() def receiver_get_by_name(context, name, project_safe=True): - return query_by_name(context, receiver_model_query, name, - project_safe=project_safe) + with session_for_read() as session: + return query_by_name(session, context, receiver_model_query, name, + project_safe=project_safe) def receiver_get_by_short_id(context, short_id, project_safe=True): - return query_by_short_id(context, receiver_model_query, models.Receiver, - short_id, project_safe=project_safe) + with session_for_read() as session: + return query_by_short_id(session, context, receiver_model_query, + models.Receiver, + short_id, project_safe=project_safe) @retry_on_deadlock @@ -1587,37 +1631,46 @@ def receiver_update(context, receiver_id, values): @retry_on_deadlock -def service_create(service_id, host=None, binary=None, topic=None): +def service_create(service_id, host=None, binary=None, topic=None, + time_now=None): with session_for_write() as session: - time_now = timeutils.utcnow(True) - svc = models.Service(id=service_id, host=host, binary=binary, - topic=topic, created_at=time_now, - updated_at=time_now) - session.add(svc) - return svc + if not time_now: + time_now = timeutils.utcnow(True) + service = models.Service(id=service_id, host=host, binary=binary, + topic=topic, created_at=time_now, + updated_at=time_now) + session.add(service) + return service @retry_on_deadlock def service_update(service_id, values=None): + if values is None: + values = {} + with session_for_write() as session: service = session.query(models.Service).get(service_id) if not service: return - if values is None: - values = {} - values.update({'updated_at': timeutils.utcnow(True)}) service.update(values) service.save(session) + return service @retry_on_deadlock +def _service_delete(session, service_id): + session.query(models.Service).filter_by( + id=service_id).delete( + synchronize_session='fetch' + ) + + def service_delete(service_id): with session_for_write() as session: - session.query(models.Service).filter_by( - id=service_id).delete(synchronize_session='fetch') + _service_delete(session, service_id) def service_get(service_id): @@ -1630,14 +1683,36 @@ def service_get_all(): return session.query(models.Service).all() -def service_get_all_expired(binary): - with session_for_read() as session: +def service_cleanup_all_expired(binary): + with session_for_write() as session: date_limit = service_expired_time() - svc = models.Service - return session.query(models.Service).filter( - and_(svc.binary == binary, svc.updated_at <= date_limit) + services = session.query(models.Service).filter( + and_(models.Service.binary == binary, + models.Service.updated_at <= date_limit) ) + for service in services: + LOG.info( + 'Breaking locks for dead service %(binary)s ' + '(id: %(service_id)s)', + { + 'binary': binary, + 'service_id': service['id'], + } + ) + _gc_by_engine(session, service['id']) + LOG.info( + 'Done breaking locks for service %(binary)s ' + '(id: %(service_id)s)', + { + 'binary': binary, + 'service_id': service['id'], + } + ) + _service_delete(session, service['id']) + + return services + @retry_on_deadlock def _mark_engine_failed(session, action_id, timestamp, reason=None): @@ -1688,31 +1763,35 @@ def dummy_gc(engine_id): @retry_on_deadlock -def gc_by_engine(engine_id): +def _gc_by_engine(session, engine_id): # Get all actions locked by an engine + + q_actions = session.query(models.Action).filter_by(owner=engine_id) + timestamp = time.time() + for a in q_actions.all(): + # Release all node locks + query = session.query(models.NodeLock).filter_by(action_id=a.id) + query.delete(synchronize_session=False) + + # Release all cluster locks + for cl in session.query(models.ClusterLock).all(): + res = _release_cluster_lock(session, cl, a.id, -1) + if not res: + _release_cluster_lock(session, cl, a.id, 1) + + # mark action failed and release lock + _mark_failed(session, a.id, timestamp, reason='Engine failure') + + +def gc_by_engine(engine_id): with session_for_write() as session: - q_actions = session.query(models.Action).filter_by(owner=engine_id) - timestamp = time.time() - for a in q_actions.all(): - # Release all node locks - query = session.query(models.NodeLock).filter_by(action_id=a.id) - query.delete(synchronize_session=False) - - # Release all cluster locks - for cl in session.query(models.ClusterLock).all(): - res = _release_cluster_lock(session, cl, a.id, -1) - if not res: - _release_cluster_lock(session, cl, a.id, 1) - - # mark action failed and release lock - _mark_failed(a.id, timestamp, reason="Engine failure") + _gc_by_engine(session, engine_id) # HealthRegistry -def health_registry_model_query(): - with session_for_read() as session: - query = session.query(models.HealthRegistry) - return query +def health_registry_model_query(session): + query = session.query(models.HealthRegistry) + return query @retry_on_deadlock @@ -1727,6 +1806,7 @@ def registry_create(context, cluster_id, check_type, interval, params, registry.engine_id = engine_id registry.enabled = enabled session.add(registry) + return registry @@ -1775,9 +1855,10 @@ def registry_get(context, cluster_id): def registry_get_by_param(context, params): - query = health_registry_model_query() - obj = utils.exact_filter(query, models.HealthRegistry, params).first() - return obj + with session_for_read() as session: + query = health_registry_model_query(session) + obj = utils.exact_filter(query, models.HealthRegistry, params).first() + return obj def registry_list_ids_by_service(context, engine_id): diff --git a/senlin/objects/service.py b/senlin/objects/service.py index 0439f3bd1..8a00195a1 100644 --- a/senlin/objects/service.py +++ b/senlin/objects/service.py @@ -49,9 +49,8 @@ class Service(base.SenlinObject, base.VersionedObjectDictCompat): return [cls._from_db_object(context, cls(), obj) for obj in objs] @classmethod - def get_all_expired(cls, context, binary): - objs = db_api.service_get_all_expired(binary) - return [cls._from_db_object(context, cls(), obj) for obj in objs] + def cleanup_all_expired(cls, binary): + db_api.service_cleanup_all_expired(binary) @classmethod def update(cls, context, obj_id, values=None): diff --git a/senlin/tests/unit/common/base.py b/senlin/tests/unit/common/base.py index f1693b98c..43ce4a58d 100644 --- a/senlin/tests/unit/common/base.py +++ b/senlin/tests/unit/common/base.py @@ -78,7 +78,9 @@ class DatabaseFixture(fixtures.Fixture): super(DatabaseFixture, self).__init__() self.golden_path = self.mktemp() self.golden_url = 'sqlite:///%s' % self.golden_path + db_api.db_sync(self.golden_url) + self.working_path = self.mktemp() self.working_url = 'sqlite:///%s' % self.working_path @@ -86,10 +88,6 @@ class DatabaseFixture(fixtures.Fixture): super(DatabaseFixture, self).setUp() shutil.copy(self.golden_path, self.working_path) - def cleanup(self): - if os.path.exists(self.working_path): - os.remove(self.working_path) - class SenlinTestCase(testscenarios.WithScenarios, testtools.TestCase, FakeLogMixin): @@ -114,7 +112,6 @@ class SenlinTestCase(testscenarios.WithScenarios, self.addCleanup(messaging.cleanup) self.db_fixture = self.useFixture(DatabaseFixture.get_fixture()) - self.addCleanup(self.db_fixture.cleanup) options.cfg.set_defaults( options.database_opts, sqlite_synchronous=False diff --git a/senlin/tests/unit/conductor/test_service.py b/senlin/tests/unit/conductor/test_service.py index 0c7f1b90d..d1397c56c 100644 --- a/senlin/tests/unit/conductor/test_service.py +++ b/senlin/tests/unit/conductor/test_service.py @@ -10,13 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime from unittest import mock import eventlet from oslo_config import cfg import oslo_messaging -from oslo_utils import timeutils from oslo_utils import uuidutils from senlin.common import consts @@ -151,25 +149,16 @@ class ConductorCleanupTest(base.SenlinTestCase): self.assertGreater(mock_update.call_count, 1) self.svc.stop() - @mock.patch.object(service_obj.Service, 'gc_by_engine') - @mock.patch.object(service_obj.Service, 'get_all_expired') - @mock.patch.object(service_obj.Service, 'delete') - def test_service_manage_cleanup(self, mock_delete, mock_get_all_expired, - mock_gc): + @mock.patch.object(service_obj.Service, 'cleanup_all_expired') + def test_service_manage_cleanup(self, mock_cleanup): self.svc = service.ConductorService('HOST', self.topic) self.svc.service_id = self.service_id - delta = datetime.timedelta(seconds=2.2 * cfg.CONF.periodic_interval) - ages_a_go = timeutils.utcnow(True) - delta - mock_get_all_expired.return_value = [ - {'id': 'foo', 'updated_at': ages_a_go} - ] self.svc.service_manage_cleanup() - mock_delete.assert_called_once_with('foo') - mock_gc.assert_called_once_with('foo') + mock_cleanup.assert_called_once_with('senlin-conductor') - @mock.patch.object(service_obj.Service, 'get_all_expired') + @mock.patch.object(service_obj.Service, 'cleanup_all_expired') def test_service_manage_cleanup_without_exception(self, - mock_get_all_expired): + mock_cleanup): cfg.CONF.set_override('periodic_interval', 0.1) self.svc = service.ConductorService('HOST', self.topic) @@ -178,11 +167,11 @@ class ConductorCleanupTest(base.SenlinTestCase): # start engine and verify that get_all is being called more than once self.svc.start() eventlet.sleep(0.6) - self.assertGreater(mock_get_all_expired.call_count, 1) self.svc.stop() + mock_cleanup.assert_called() - @mock.patch.object(service_obj.Service, 'get_all_expired') - def test_service_manage_cleanup_with_exception(self, mock_get_all_expired): + @mock.patch.object(service_obj.Service, 'cleanup_all_expired') + def test_service_manage_cleanup_with_exception(self, mock_cleanup): cfg.CONF.set_override('periodic_interval', 0.1) self.svc = service.ConductorService('HOST', self.topic) @@ -190,8 +179,8 @@ class ConductorCleanupTest(base.SenlinTestCase): # start engine and verify that get_all is being called more than once # even with the exception being thrown - mock_get_all_expired.side_effect = Exception('blah') + mock_cleanup.side_effect = Exception('blah') self.svc.start() eventlet.sleep(0.6) - self.assertGreater(mock_get_all_expired.call_count, 1) self.svc.stop() + mock_cleanup.assert_called() diff --git a/senlin/tests/unit/db/test_service_api.py b/senlin/tests/unit/db/test_service_api.py index 4dc093b8c..c15f12986 100644 --- a/senlin/tests/unit/db/test_service_api.py +++ b/senlin/tests/unit/db/test_service_api.py @@ -15,7 +15,6 @@ from oslo_utils import timeutils from oslo_utils import uuidutils from senlin.db.sqlalchemy import api as db_api -from senlin.db.sqlalchemy import models from senlin.tests.unit.common import base from senlin.tests.unit.common import utils @@ -34,19 +33,12 @@ class DBAPIServiceTest(base.SenlinTestCase): } values.update(kwargs) - with db_api.session_for_write() as session: - time_now = timeutils.utcnow(True) - svc = models.Service( - id=service_id, - host=values.get('host'), - binary=values.get('binary'), - topic=values.get('topic'), - created_at=values.get('created_at') or time_now, - updated_at=values.get('updated_at') or time_now, - ) - session.add(svc) - - return svc + return db_api.service_create( + service_id, host=values.get('host'), + binary=values.get('binary'), + topic=values.get('topic'), + time_now=kwargs.get('time_now') + ) def test_service_create_get(self): service = self._create_service() @@ -74,32 +66,31 @@ class DBAPIServiceTest(base.SenlinTestCase): self.assertEqual(4, len(services)) def test_service_get_all_expired(self): - for index in range(6): - dt = timeutils.utcnow() - datetime.timedelta(seconds=60 * index) + for index in range(3): + dt = timeutils.utcnow() - datetime.timedelta(hours=8) values = { 'binary': 'senlin-health-manager', - 'host': 'host-%s' % index, - 'updated_at': dt + 'host': 'host-0-%s' % index, + 'time_now': dt } self._create_service(uuidutils.generate_uuid(), **values) - for index in range(8): - dt = timeutils.utcnow() - datetime.timedelta(seconds=60 * index) + for index in range(3): + dt = timeutils.utcnow() values = { - 'binary': 'senlin-engine', - 'host': 'host-%s' % index, - 'updated_at': dt + 'binary': 'senlin-health-manager', + 'host': 'host-1-%s' % index, + 'time_now': dt } self._create_service(uuidutils.generate_uuid(), **values) - services = db_api.service_get_all_expired('senlin-health-manager') - self.assertEqual(3, len(services.all())) + db_api.service_cleanup_all_expired('senlin-health-manager') - services = db_api.service_get_all_expired('senlin-engine') - self.assertEqual(5, len(services.all())) + self.assertEqual(3, len(db_api.service_get_all())) def test_service_update(self): old_service = self._create_service() + self.assertIsNotNone(old_service) old_updated_time = old_service.updated_at values = {'host': 'host-updated'}