bilean/bilean/db/sqlalchemy/api.py

878 lines
25 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''Implementation of SQLAlchemy backend.'''
import six
import sys
from oslo_config import cfg
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy.orm.session import Session
from bilean.common import consts
from bilean.common import exception
from bilean.common.i18n import _
from bilean.db.sqlalchemy import filters as db_filters
from bilean.db.sqlalchemy import migration
from bilean.db.sqlalchemy import models
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_facade = None
def get_facade():
global _facade
if not _facade:
_facade = db_session.EngineFacade.from_config(CONF)
return _facade
get_engine = lambda: get_facade().get_engine()
get_session = lambda: get_facade().get_session()
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def model_query(context, *args):
session = _session(context)
query = session.query(*args)
return query
def _get_sort_keys(sort_keys, mapping):
'''Returns an array containing only whitelisted keys
:param sort_keys: an array of strings
:param mapping: a mapping from keys to DB column names
:returns: filtered list of sort keys
'''
if isinstance(sort_keys, six.string_types):
sort_keys = [sort_keys]
return [mapping[key] for key in sort_keys or [] if key in mapping]
def _paginate_query(context, query, model, limit=None, marker=None,
sort_keys=None, sort_dir=None, default_sort_keys=None):
if not sort_keys:
sort_keys = default_sort_keys or []
if not sort_dir:
sort_dir = 'asc'
model_marker = None
if marker:
model_marker = model_query(context, model).get(marker)
try:
query = utils.paginate_query(query, model, limit, sort_keys,
model_marker, sort_dir)
except utils.InvalidSortKey:
raise exception.InvalidParameter(name='sort_keys', value=sort_keys)
return query
def soft_delete_aware_query(context, *args, **kwargs):
"""Query helper that accounts for context's `show_deleted` field.
:param show_deleted: if True, overrides context's show_deleted field.
"""
query = model_query(context, *args)
show_deleted = kwargs.get('show_deleted') or context.show_deleted
if not show_deleted:
query = query.filter_by(deleted_at=None)
return query
def _session(context):
return (context and context.session) or get_session()
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""
return migration.db_sync(engine, version=version)
def db_version(engine):
"""Display the current database version."""
return migration.db_version(engine)
# users
def user_get(context, user_id, show_deleted=False, project_safe=True):
query = model_query(context, models.User)
user = query.get(user_id)
deleted_ok = show_deleted or context.show_deleted
if user is None or user.deleted_at is not None and not deleted_ok:
return None
if project_safe and context.project != user.id:
return None
return user
def user_update(context, user_id, values):
user = user_get(context, user_id, project_safe=False)
if user is None:
raise exception.UserNotFound(user=user_id)
user.update(values)
user.save(_session(context))
return user
def user_create(context, values):
user_ref = models.User()
user_ref.update(values)
user_ref.save(_session(context))
return user_ref
def user_delete(context, user_id):
session = _session(context)
user = user_get(context, user_id)
if user is None:
return
# Delete all related resource records
for resource in user.resources:
session.delete(resource)
# Delete all related event records
for event in user.events:
session.delete(event)
user.soft_delete(session=session)
session.flush()
def user_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
query = soft_delete_aware_query(context, models.User,
show_deleted=show_deleted)
if filters is None:
filters = {}
sort_key_map = {
consts.USER_CREATED_AT: models.User.created_at.key,
consts.USER_UPDATED_AT: models.User.updated_at.key,
consts.USER_NAME: models.User.name.key,
consts.USER_BALANCE: models.User.balance.key,
consts.USER_STATUS: models.User.status.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.User, filters)
return _paginate_query(context, query, models.User,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
# rules
def rule_get(context, rule_id, show_deleted=False):
query = model_query(context, models.Rule)
rule = query.filter_by(id=rule_id).first()
deleted_ok = show_deleted or context.show_deleted
if rule is None or rule.deleted_at is not None and not deleted_ok:
return None
return rule
def rule_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
query = soft_delete_aware_query(context, models.Rule,
show_deleted=show_deleted)
if filters is None:
filters = {}
sort_key_map = {
consts.RULE_NAME: models.Rule.name.key,
consts.RULE_TYPE: models.Rule.type.key,
consts.RULE_CREATED_AT: models.Rule.created_at.key,
consts.RULE_UPDATED_AT: models.Rule.updated_at.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Rule, filters)
return _paginate_query(context, query, models.Rule,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def rule_create(context, values):
rule_ref = models.Rule()
rule_ref.update(values)
rule_ref.save(_session(context))
return rule_ref
def rule_update(context, rule_id, values):
rule = rule_get(context, rule_id)
if rule is None:
raise exception.RuleNotFound(rule=rule_id)
rule.update(values)
rule.save(_session(context))
def rule_delete(context, rule_id):
rule = rule_get(context, rule_id)
if rule is None:
return
session = Session.object_session(rule)
rule.soft_delete(session=session)
session.flush()
# resources
def resource_get(context, resource_id, show_deleted=False, project_safe=True):
query = model_query(context, models.Resource)
resource = query.get(resource_id)
deleted_ok = show_deleted or context.show_deleted
if resource is None or resource.deleted_at is not None and not deleted_ok:
return None
if project_safe and context.project != resource.user_id:
return None
return resource
def resource_get_all(context, user_id=None, show_deleted=False,
limit=None, marker=None, sort_keys=None, sort_dir=None,
filters=None, project_safe=True):
query = soft_delete_aware_query(context, models.Resource,
show_deleted=show_deleted)
if project_safe:
query = query.filter_by(user_id=context.project)
elif user_id:
query = query.filter_by(user_id=user_id)
if filters is None:
filters = {}
sort_key_map = {
consts.RES_CREATED_AT: models.Resource.created_at.key,
consts.RES_UPDATED_AT: models.Resource.updated_at.key,
consts.RES_RESOURCE_TYPE: models.Resource.resource_type.key,
consts.RES_USER_ID: models.Resource.user_id.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Resource, filters)
return _paginate_query(context, query, models.Resource,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def resource_create(context, values):
resource_ref = models.Resource()
resource_ref.update(values)
resource_ref.save(_session(context))
return resource_ref
def resource_update(context, resource_id, values):
project_safe = True
if context.is_admin:
project_safe = False
resource = resource_get(context, resource_id, show_deleted=True,
project_safe=project_safe)
if resource is None:
raise exception.ResourceNotFound(resource=resource_id)
resource.update(values)
resource.save(_session(context))
return resource
def resource_delete(context, resource_id, soft_delete=True):
resource = resource_get(context, resource_id, project_safe=False)
if resource is None:
return
session = Session.object_session(resource)
if soft_delete:
resource.soft_delete(session=session)
else:
session.delete(resource)
session.flush()
# events
def event_get(context, event_id, project_safe=True):
query = model_query(context, models.Event)
event = query.get(event_id)
if event is None:
return None
if project_safe and context.project != event.user_id:
return None
return event
def event_get_all(context, user_id=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None,
start_time=None, end_time=None, project_safe=True,
show_deleted=False):
query = soft_delete_aware_query(context, models.Event,
show_deleted=show_deleted)
if project_safe:
query = query.filter_by(user_id=context.project)
elif user_id:
query = query.filter_by(user_id=user_id)
if start_time:
query = query.filter_by(models.Event.timestamp >= start_time)
if end_time:
query = query.filter_by(models.Event.timestamp <= end_time)
if filters is None:
filters = {}
sort_key_map = {
consts.EVENT_ACTION: models.Event.action.key,
consts.EVENT_RESOURCE_TYPE: models.Event.resource_type.key,
consts.EVENT_TIMESTAMP: models.Event.timestamp.key,
consts.EVENT_USER_ID: models.Event.user_id.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Event, filters)
return _paginate_query(context, query, models.Event,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def event_create(context, values):
event_ref = models.Event()
event_ref.update(values)
event_ref.save(_session(context))
return event_ref
# jobs
def job_create(context, values):
job_ref = models.Job()
job_ref.update(values)
job_ref.save(_session(context))
return job_ref
def job_get_all(context, scheduler_id=None):
query = model_query(context, models.Job)
if scheduler_id:
query = query.filter_by(scheduler_id=scheduler_id)
return query.all()
def job_delete(context, job_id):
job = model_query(context, models.Job).get(job_id)
if job is None:
return
session = Session.object_session(job)
session.delete(job)
session.flush()
# policies
def policy_get(context, policy_id, show_deleted=False):
query = model_query(context, models.Policy)
policy = query.get(policy_id)
deleted_ok = show_deleted or context.show_deleted
if policy is None or policy.deleted_at is not None and not deleted_ok:
return None
return policy
def policy_get_all(context, limit=None, marker=None,
sort_keys=None, sort_dir=None,
filters=None, show_deleted=False):
query = soft_delete_aware_query(context, models.Policy,
show_deleted=show_deleted)
if filters is None:
filters = {}
sort_key_map = {
consts.POLICY_NAME: models.Policy.name.key,
consts.POLICY_CREATED_AT: models.Policy.created_at.key,
consts.POLICY_UPDATED_AT: models.Policy.updated_at.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Policy, filters)
return _paginate_query(context, query, models.Policy,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def policy_create(context, values):
policy_ref = models.Policy()
policy_ref.update(values)
policy_ref.save(_session(context))
return policy_ref
def policy_update(context, policy_id, values):
policy = policy_get(context, policy_id)
if policy is None:
raise exception.PolicyNotFound(policy=policy_id)
policy.update(values)
policy.save(_session(context))
def policy_delete(context, policy_id):
policy = policy_get(context, policy_id)
if policy is None:
return
session = Session.object_session(policy)
policy.soft_delete(session=session)
session.flush()
# locks
def user_lock_acquire(user_id, action_id):
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is None:
lock = models.UserLock(user_id=user_id, action_id=action_id)
session.add(lock)
session.commit()
return lock.action_id
def user_lock_release(user_id, action_id):
session = get_session()
session.begin()
success = False
lock = session.query(models.UserLock).get(user_id)
if lock is not None and lock.action_id == action_id:
session.delete(lock)
success = True
session.commit()
return success
def user_lock_steal(user_id, action_id):
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is not None:
lock.action_id = action_id
lock.save(session)
else:
lock = models.UserLock(user_id=user_id, action_id=action_id)
session.add(lock)
session.commit()
return lock.action_id
# actions
def action_create(context, values):
action = models.Action()
action.update(values)
action.save(_session(context))
return action
def action_update(context, action_id, values):
session = get_session()
action = session.query(models.Action).get(action_id)
if not action:
raise exception.ActionNotFound(action=action_id)
action.update(values)
action.save(session)
def action_get(context, action_id, project_safe=True, refresh=False):
session = _session(context)
action = session.query(models.Action).get(action_id)
if action is None:
return None
if not context.is_admin and project_safe:
if action.project != context.project:
return None
session.refresh(action)
return action
def action_get_all_by_owner(context, owner_id):
query = model_query(context, models.Action).\
filter_by(owner=owner_id)
return query.all()
def action_get_all(context, filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None):
query = model_query(context, models.Action)
if filters:
query = db_filters.exact_filter(query, models.Action, filters)
sort_key_map = {
consts.ACTION_CREATED_AT: models.Action.created_at.key,
consts.ACTION_UPDATED_AT: models.Action.updated_at.key,
consts.ACTION_NAME: models.Action.name.key,
consts.ACTION_STATUS: models.Action.status.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Action, filters)
return _paginate_query(context, query, models.Action,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def action_check_status(context, action_id, timestamp):
session = _session(context)
q = session.query(models.ActionDependency)
count = q.filter_by(dependent=action_id).count()
if count > 0:
return consts.ACTION_WAITING
action = session.query(models.Action).get(action_id)
if action.status == consts.ACTION_WAITING:
session.begin()
action.status = consts.ACTION_READY
action.status_reason = _('All depended actions completed.')
action.end_time = timestamp
action.save(session)
session.commit()
return action.status
def dependency_get_depended(context, action_id):
session = _session(context)
q = session.query(models.ActionDependency).filter_by(dependent=action_id)
return [d.depended for d in q.all()]
def dependency_get_dependents(context, action_id):
session = _session(context)
q = session.query(models.ActionDependency).filter_by(depended=action_id)
return [d.dependent for d in q.all()]
def dependency_add(context, depended, dependent):
if isinstance(depended, list) and isinstance(dependent, list):
raise exception.NotSupport(
_('Multiple dependencies between lists not support'))
session = _session(context)
if isinstance(depended, list):
session.begin()
for d in depended:
r = models.ActionDependency(depended=d, dependent=dependent)
session.add(r)
query = session.query(models.Action).filter_by(id=dependent)
query.update({'status': consts.ACTION_WAITING,
'status_reason': _('Waiting for depended actions.')},
synchronize_session=False)
session.commit()
return
# Only dependent can be a list now, convert it to a list if it
# is not a list
if not isinstance(dependent, list): # e.g. B,C,D depend on A
dependents = [dependent]
else:
dependents = dependent
session.begin()
for d in dependents:
r = models.ActionDependency(depended=depended, dependent=d)
session.add(r)
q = session.query(models.Action).filter(models.Action.id.in_(dependents))
q.update({'status': consts.ACTION_WAITING,
'status_reason': _('Waiting for depended actions.')},
synchronize_session=False)
session.commit()
def action_mark_succeeded(context, action_id, timestamp):
session = _session(context)
session.begin()
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_SUCCEEDED,
'status_reason': _('Action completed successfully.'),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
subquery = session.query(models.ActionDependency).filter_by(
depended=action_id)
subquery.delete(synchronize_session=False)
session.commit()
def _mark_failed(session, action_id, timestamp, reason=None):
# mark myself as failed
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_FAILED,
'status_reason': (six.text_type(reason) if reason else
_('Action execution failed')),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
query = session.query(models.ActionDependency)
query = query.filter_by(depended=action_id)
dependents = [d.dependent for d in query.all()]
query.delete(synchronize_session=False)
for d in dependents:
_mark_failed(session, d, timestamp)
def action_mark_failed(context, action_id, timestamp, reason=None):
session = _session(context)
session.begin()
_mark_failed(session, action_id, timestamp, reason)
session.commit()
def _mark_cancelled(session, action_id, timestamp, reason=None):
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_CANCELLED,
'status_reason': (six.text_type(reason) if reason else
_('Action execution failed')),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
query = session.query(models.ActionDependency)
query = query.filter_by(depended=action_id)
dependents = [d.dependent for d in query.all()]
query.delete(synchronize_session=False)
for d in dependents:
_mark_cancelled(session, d, timestamp)
def action_mark_cancelled(context, action_id, timestamp, reason=None):
session = _session(context)
session.begin()
_mark_cancelled(session, action_id, timestamp, reason)
session.commit()
def action_acquire(context, action_id, owner, timestamp):
session = _session(context)
with session.begin():
action = session.query(models.Action).get(action_id)
if not action:
return None
if action.owner and action.owner != owner:
return None
if action.status != consts.ACTION_READY:
msg = _('The action is not in an executable status: '
'%s') % action.status
LOG.warning(msg)
return None
action.owner = owner
action.start_time = timestamp
action.status = consts.ACTION_RUNNING
action.status_reason = _('The action is being processed.')
return action
def action_acquire_first_ready(context, owner, timestamp):
session = _session(context)
with session.begin():
action = session.query(models.Action).\
filter_by(status=consts.ACTION_READY).\
filter_by(owner=None).first()
if action:
action.owner = owner
action.start_time = timestamp
action.status = consts.ACTION_RUNNING
action.status_reason = _('The action is being processed.')
return action
def action_abandon(context, action_id):
'''Abandon an action for other workers to execute again.
This API is always called with the action locked by the current
worker. There is no chance the action is gone or stolen by others.
'''
query = model_query(context, models.Action)
action = query.get(action_id)
action.owner = None
action.start_time = None
action.status = consts.ACTION_READY
action.status_reason = _('The action was abandoned.')
action.save(query.session)
return action
def action_lock_check(context, action_id, owner=None):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.ActionNotFound(action=action_id)
if owner:
return owner if owner == action.owner else action.owner
else:
return action.owner if action.owner else None
def action_signal(context, action_id, value):
query = model_query(context, models.Action)
action = query.get(action_id)
if not action:
return
action.control = value
action.save(query.session)
def action_signal_query(context, action_id):
action = model_query(context, models.Action).get(action_id)
if not action:
return None
return action.control
def action_delete(context, action_id, force=False):
session = _session(context)
action = session.query(models.Action).get(action_id)
if not action:
return
if ((action.status == 'WAITING') or (action.status == 'RUNNING') or
(action.status == 'SUSPENDED')):
raise exception.ResourceBusyError(resource_type='action',
resource_id=action_id)
session.begin()
session.delete(action)
session.commit()
session.flush()
# services
def service_create(context, host, binary, topic=None):
time_now = timeutils.utcnow()
svc = models.Service(host=host, binary=binary,
topic=topic, created_at=time_now,
updated_at=time_now)
svc.save(_session(context))
return svc
def service_update(context, service_id, values=None):
service = service_get(context, service_id)
if not service:
return
if values is None:
values = {}
values.update({'updated_at': timeutils.utcnow()})
service.update(values)
service.save(_session(context))
return service
def service_delete(context, service_id):
session = _session(context)
session.query(models.Service).filter_by(
id=service_id).delete(synchronize_session='fetch')
def service_get(context, service_id):
return model_query(context, models.Service).get(service_id)
def service_get_by_host_and_binary(context, host, binary):
query = model_query(context, models.Service)
return query.filter_by(host=host).filter_by(binary=binary).first()
def service_get_all(context):
return model_query(context, models.Service).all()