Add actions

Make every operation as an action, and divide an action into
serveral atom task, this make sure that every operation could
be correctly executed.

Change-Id: I99ad1400936dc7236b7c55bcf67239dedbca0f48
This commit is contained in:
lvdongbing 2016-03-29 23:49:01 -04:00
parent 1782a3d694
commit 1e540840c3
22 changed files with 1934 additions and 310 deletions

View File

@ -42,9 +42,6 @@ service_opts = [
cfg.IntOpt('num_engine_workers',
default=processutils.get_worker_count(),
help=_('Number of heat-engine processes to fork and run.')),
cfg.StrOpt('environment_dir',
default='/etc/bilean/environments',
help=_('The directory to search for environment files.')),
]
engine_opts = [

View File

@ -21,12 +21,30 @@ RPC_ATTRs = (
ENGINE_TOPIC,
SCHEDULER_TOPIC,
NOTIFICATION_TOPICS,
ENGINE_DISPATCHER_TOPIC,
RPC_API_VERSION,
) = (
'bilean-engine',
'bilean-scheduler',
'billing_notifications',
'1.1',
'bilean_engine_dispatcher',
'1.0',
)
ACTION_NAMES = (
USER_CREATE_RESOURCE, USER_UPDATE_RESOURCE, USER_DELETE_RESOURCE,
USER_SETTLE_ACCOUNT,
) = (
'USER_CREATE_RESOURCE', 'USER_UPDATE_RESOURCE', 'USER_DELETE_RESOURCE',
'USER_SETTLE_ACCOUNT',
)
ACTION_STATUSES = (
ACTION_INIT, ACTION_WAITING, ACTION_READY, ACTION_RUNNING,
ACTION_SUCCEEDED, ACTION_FAILED, ACTION_CANCELLED
) = (
'INIT', 'WAITING', 'READY', 'RUNNING',
'SUCCEEDED', 'FAILED', 'CANCELLED',
)
RPC_PARAMS = (

View File

@ -88,6 +88,10 @@ class MultipleChoices(BileanException):
"Please be more specific.")
class InvalidInput(BileanException):
msg_fmt = _("Invalid value '%(value)s' specified for '%(name)s'")
class InvalidParameter(BileanException):
msg_fmt = _("Invalid value '%(value)s' specified for '%(name)s'")
@ -108,12 +112,8 @@ class RuleNotSpecified(BileanException):
msg_fmt = _("Rule not specified.")
class RuleOperationFailed(BileanException):
msg_fmt = _("%(message)s")
class RuleOperationTimeout(BileanException):
msg_fmt = _("%(message)s")
class ActionNotFound(BileanException):
msg_fmt = _("The action (%(action)s) could not be found.")
class PolicyNotFound(BileanException):

View File

@ -14,6 +14,7 @@
Utilities module.
'''
import datetime
import random
import string
@ -151,7 +152,7 @@ def random_name(length=8):
def format_time(value):
"""Cut microsecond and format to isoformat string."""
if value:
if isinstance(value, datetime.datetime):
value = value.replace(microsecond=0)
value = value.isoformat()
return value

View File

@ -120,8 +120,8 @@ def resource_update(context, resource_id, values):
return IMPL.resource_update(context, resource_id, values)
def resource_delete(context, resource_id):
IMPL.resource_delete(context, resource_id)
def resource_delete(context, resource_id, soft_delete=True):
IMPL.resource_delete(context, resource_id, soft_delete=soft_delete)
# events
@ -187,9 +187,122 @@ def policy_delete(context, policy_id):
# locks
def user_lock_acquire(user_id, engine_id):
return IMPL.user_lock_acquire(user_id, engine_id)
def user_lock_acquire(user_id, action_id):
return IMPL.user_lock_acquire(user_id, action_id)
def user_lock_release(user_id, engine_id=None):
return IMPL.user_lock_release(user_id, engine_id=engine_id)
def user_lock_release(user_id, action_id):
return IMPL.user_lock_release(user_id, action_id)
def user_lock_steal(user_id, action_id):
return IMPL.user_lock_steal(user_id, action_id)
# actions
def action_create(context, values):
return IMPL.action_create(context, values)
def action_update(context, action_id, values):
return IMPL.action_update(context, action_id, values)
def action_get(context, action_id, project_safe=True, refresh=False):
return IMPL.action_get(context, action_id, project_safe=project_safe,
refresh=refresh)
def action_get_all_by_owner(context, owner):
return IMPL.action_get_all_by_owner(context, owner)
def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
project_safe=True):
return IMPL.action_get_all(context, filters=filters, sort=sort,
limit=limit, marker=marker,
project_safe=project_safe)
def action_check_status(context, action_id, timestamp):
return IMPL.action_check_status(context, action_id, timestamp)
def dependency_add(context, depended, dependent):
return IMPL.dependency_add(context, depended, dependent)
def dependency_get_depended(context, action_id):
return IMPL.dependency_get_depended(context, action_id)
def dependency_get_dependents(context, action_id):
return IMPL.dependency_get_dependents(context, action_id)
def action_mark_succeeded(context, action_id, timestamp):
return IMPL.action_mark_succeeded(context, action_id, timestamp)
def action_mark_failed(context, action_id, timestamp, reason=None):
return IMPL.action_mark_failed(context, action_id, timestamp, reason)
def action_mark_cancelled(context, action_id, timestamp):
return IMPL.action_mark_cancelled(context, action_id, timestamp)
def action_acquire(context, action_id, owner, timestamp):
return IMPL.action_acquire(context, action_id, owner, timestamp)
def action_acquire_first_ready(context, owner, timestamp):
return IMPL.action_acquire_first_ready(context, owner, timestamp)
def action_abandon(context, action_id):
return IMPL.action_abandon(context, action_id)
def action_lock_check(context, action_id, owner=None):
'''Check whether an action has been locked(by a owner).'''
return IMPL.action_lock_check(context, action_id, owner)
def action_signal(context, action_id, value):
'''Send signal to an action via DB.'''
return IMPL.action_signal(context, action_id, value)
def action_signal_query(context, action_id):
'''Query signal status for the sepcified action.'''
return IMPL.action_signal_query(context, action_id)
def action_delete(context, action_id, force=False):
return IMPL.action_delete(context, action_id, force)
# services
def service_create(context, host, binary, topic=None):
return IMPL.service_create(context, host, binary, topic=topic)
def service_update(context, service_id, values=None):
return IMPL.service_update(context, service_id, values=values)
def service_delete(context, service_id):
return IMPL.service_delete(context, service_id)
def service_get(context, service_id):
return IMPL.service_get(context, service_id)
def service_get_by_host_and_binary(context, host, binary):
return IMPL.service_get_by_host_and_binary(context, host, binary)
def service_get_all(context):
return IMPL.service_get_all(context)

View File

@ -19,12 +19,13 @@ from oslo_config import cfg
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy.orm.session import Session
from bilean.common import consts
from bilean.common import exception
from bilean.common.i18n import _LE
from bilean.common.i18n import _
from bilean.db.sqlalchemy import filters as db_filters
from bilean.db.sqlalchemy import migration
from bilean.db.sqlalchemy import models
@ -224,7 +225,7 @@ def rule_get_all(context, show_deleted=False, limit=None,
return _paginate_query(context, query, models.Rule,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['created_at']).all()
default_sort_keys=['id']).all()
def rule_create(context, values):
@ -296,7 +297,7 @@ def resource_get_all(context, user_id=None, show_deleted=False,
return _paginate_query(context, query, models.Resource,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['created_at']).all()
default_sort_keys=['id']).all()
def resource_create(context, values):
@ -307,7 +308,11 @@ def resource_create(context, values):
def resource_update(context, resource_id, values):
resource = resource_get(context, resource_id)
project_safe = True
if context.is_admin:
project_safe = False
resource = resource_get(context, resource_id, show_deleted=True,
project_safe=project_safe)
if resource is None:
raise exception.ResourceNotFound(resource=resource_id)
@ -377,7 +382,7 @@ def event_get_all(context, user_id=None, limit=None, marker=None,
return _paginate_query(context, query, models.Event,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['timestamp']).all()
default_sort_keys=['id']).all()
def event_create(context, values):
@ -478,42 +483,394 @@ def policy_delete(context, policy_id):
# locks
def user_lock_acquire(user_id, engine_id):
'''Acquire lock on a user.
def user_lock_acquire(user_id, action_id):
session = get_session()
session.begin()
:param user_id: ID of the user.
:param engine_id: ID of the engine which wants to lock the user.
:return: A user lock if success else False.
'''
lock = session.query(models.UserLock).get(user_id)
if lock is None:
lock = models.UserLock(user_id=user_id, action_id=action_id)
session.add(lock)
session.commit()
return lock.action_id
def user_lock_release(user_id, action_id):
session = get_session()
session.begin()
success = False
lock = session.query(models.UserLock).get(user_id)
if lock is not None and lock.action_id == action_id:
session.delete(lock)
success = True
session.commit()
return success
def user_lock_steal(user_id, action_id):
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is not None:
return False
lock.action_id = action_id
lock.save(session)
else:
try:
lock = models.UserLock(user_id=user_id, engine_id=engine_id)
session.add(lock)
except Exception as ex:
LOG.error(_LE('Error: %s'), six.text_type(ex))
return False
lock = models.UserLock(user_id=user_id, action_id=action_id)
session.add(lock)
session.commit()
return lock
return lock.action_id
def user_lock_release(user_id, engine_id=None):
'''Release lock on a user.
# actions
def action_create(context, values):
action = models.Action()
action.update(values)
action.save(_session(context))
return action
:param user_id: ID of the user.
:return: True indicates successful release, False indicates failure.
'''
def action_update(context, action_id, values):
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is None:
action = session.query(models.Action).get(action_id)
if not action:
raise exception.ActionNotFound(action=action_id)
action.update(values)
action.save(session)
def action_get(context, action_id, project_safe=True, refresh=False):
session = _session(context)
action = session.query(models.Action).get(action_id)
if action is None:
return None
if not context.is_admin and project_safe:
if action.project != context.project:
return None
session.refresh(action)
return action
def action_get_all_by_owner(context, owner_id):
query = model_query(context, models.Action).\
filter_by(owner=owner_id)
return query.all()
def action_get_all(context, filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None):
query = model_query(context, models.Action)
if filters:
query = db_filters.exact_filter(query, models.Action, filters)
sort_key_map = {
consts.ACTION_CREATED_AT: models.Action.created_at.key,
consts.ACTION_UPDATED_AT: models.Action.updated_at.key,
consts.ACTION_NAME: models.Action.name.key,
consts.ACTION_STATUS: models.Action.status.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Action, filters)
return _paginate_query(context, query, models.Action,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def action_check_status(context, action_id, timestamp):
session = _session(context)
q = session.query(models.ActionDependency)
count = q.filter_by(dependent=action_id).count()
if count > 0:
return consts.ACTION_WAITING
action = session.query(models.Action).get(action_id)
if action.status == consts.ACTION_WAITING:
session.begin()
action.status = consts.ACTION_READY
action.status_reason = _('All depended actions completed.')
action.end_time = timestamp
action.save(session)
session.commit()
return False
session.delete(lock)
return action.status
def dependency_get_depended(context, action_id):
session = _session(context)
q = session.query(models.ActionDependency).filter_by(dependent=action_id)
return [d.depended for d in q.all()]
def dependency_get_dependents(context, action_id):
session = _session(context)
q = session.query(models.ActionDependency).filter_by(depended=action_id)
return [d.dependent for d in q.all()]
def dependency_add(context, depended, dependent):
if isinstance(depended, list) and isinstance(dependent, list):
raise exception.NotSupport(
_('Multiple dependencies between lists not support'))
session = _session(context)
if isinstance(depended, list):
session.begin()
for d in depended:
r = models.ActionDependency(depended=d, dependent=dependent)
session.add(r)
query = session.query(models.Action).filter_by(id=dependent)
query.update({'status': consts.ACTION_WAITING,
'status_reason': _('Waiting for depended actions.')},
synchronize_session=False)
session.commit()
return
# Only dependent can be a list now, convert it to a list if it
# is not a list
if not isinstance(dependent, list): # e.g. B,C,D depend on A
dependents = [dependent]
else:
dependents = dependent
session.begin()
for d in dependents:
r = models.ActionDependency(depended=depended, dependent=d)
session.add(r)
q = session.query(models.Action).filter(models.Action.id.in_(dependents))
q.update({'status': consts.ACTION_WAITING,
'status_reason': _('Waiting for depended actions.')},
synchronize_session=False)
session.commit()
return True
def action_mark_succeeded(context, action_id, timestamp):
session = _session(context)
session.begin()
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_SUCCEEDED,
'status_reason': _('Action completed successfully.'),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
subquery = session.query(models.ActionDependency).filter_by(
depended=action_id)
subquery.delete(synchronize_session=False)
session.commit()
def _mark_failed(session, action_id, timestamp, reason=None):
# mark myself as failed
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_FAILED,
'status_reason': (six.text_type(reason) if reason else
_('Action execution failed')),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
query = session.query(models.ActionDependency)
query = query.filter_by(depended=action_id)
dependents = [d.dependent for d in query.all()]
query.delete(synchronize_session=False)
for d in dependents:
_mark_failed(session, d, timestamp)
def action_mark_failed(context, action_id, timestamp, reason=None):
session = _session(context)
session.begin()
_mark_failed(session, action_id, timestamp, reason)
session.commit()
def _mark_cancelled(session, action_id, timestamp, reason=None):
query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_CANCELLED,
'status_reason': (six.text_type(reason) if reason else
_('Action execution failed')),
'end_time': timestamp,
}
query.update(values, synchronize_session=False)
query = session.query(models.ActionDependency)
query = query.filter_by(depended=action_id)
dependents = [d.dependent for d in query.all()]
query.delete(synchronize_session=False)
for d in dependents:
_mark_cancelled(session, d, timestamp)
def action_mark_cancelled(context, action_id, timestamp, reason=None):
session = _session(context)
session.begin()
_mark_cancelled(session, action_id, timestamp, reason)
session.commit()
def action_acquire(context, action_id, owner, timestamp):
session = _session(context)
with session.begin():
action = session.query(models.Action).get(action_id)
if not action:
return None
if action.owner and action.owner != owner:
return None
if action.status != consts.ACTION_READY:
msg = _('The action is not in an executable status: '
'%s') % action.status
LOG.warning(msg)
return None
action.owner = owner
action.start_time = timestamp
action.status = consts.ACTION_RUNNING
action.status_reason = _('The action is being processed.')
return action
def action_acquire_first_ready(context, owner, timestamp):
session = _session(context)
with session.begin():
action = session.query(models.Action).\
filter_by(status=consts.ACTION_READY).\
filter_by(owner=None).first()
if action:
action.owner = owner
action.start_time = timestamp
action.status = consts.ACTION_RUNNING
action.status_reason = _('The action is being processed.')
return action
def action_abandon(context, action_id):
'''Abandon an action for other workers to execute again.
This API is always called with the action locked by the current
worker. There is no chance the action is gone or stolen by others.
'''
query = model_query(context, models.Action)
action = query.get(action_id)
action.owner = None
action.start_time = None
action.status = consts.ACTION_READY
action.status_reason = _('The action was abandoned.')
action.save(query.session)
return action
def action_lock_check(context, action_id, owner=None):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.ActionNotFound(action=action_id)
if owner:
return owner if owner == action.owner else action.owner
else:
return action.owner if action.owner else None
def action_signal(context, action_id, value):
query = model_query(context, models.Action)
action = query.get(action_id)
if not action:
return
action.control = value
action.save(query.session)
def action_signal_query(context, action_id):
action = model_query(context, models.Action).get(action_id)
if not action:
return None
return action.control
def action_delete(context, action_id, force=False):
session = _session(context)
action = session.query(models.Action).get(action_id)
if not action:
return
if ((action.status == 'WAITING') or (action.status == 'RUNNING') or
(action.status == 'SUSPENDED')):
raise exception.ResourceBusyError(resource_type='action',
resource_id=action_id)
session.begin()
session.delete(action)
session.commit()
session.flush()
# services
def service_create(context, host, binary, topic=None):
time_now = timeutils.utcnow()
svc = models.Service(host=host, binary=binary,
topic=topic, created_at=time_now,
updated_at=time_now)
svc.save(_session(context))
return svc
def service_update(context, service_id, values=None):
service = service_get(context, service_id)
if not service:
return
if values is None:
values = {}
values.update({'updated_at': timeutils.utcnow()})
service.update(values)
service.save(_session(context))
return service
def service_delete(context, service_id):
session = _session(context)
session.query(models.Service).filter_by(
id=service_id).delete(synchronize_session='fetch')
def service_get(context, service_id):
return model_query(context, models.Service).get(service_id)
def service_get_by_host_and_binary(context, host, binary):
query = model_query(context, models.Service)
return query.filter_by(host=host).filter_by(binary=binary).first()
def service_get_all(context):
return model_query(context, models.Service).all()

View File

@ -32,8 +32,8 @@ def upgrade(migrate_engine):
sqlalchemy.Column('rate', sqlalchemy.Float),
sqlalchemy.Column('credit', sqlalchemy.Integer),
sqlalchemy.Column('last_bill', sqlalchemy.DateTime),
sqlalchemy.Column('status', sqlalchemy.String(10)),
sqlalchemy.Column('status_reason', sqlalchemy.String(255)),
sqlalchemy.Column('status', sqlalchemy.String(255)),
sqlalchemy.Column('status_reason', sqlalchemy.Text),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
@ -109,11 +109,66 @@ def upgrade(migrate_engine):
mysql_charset='utf8'
)
action = sqlalchemy.Table(
'action', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('name', sqlalchemy.String(63)),
sqlalchemy.Column('context', types.Dict),
sqlalchemy.Column('target', sqlalchemy.String(36)),
sqlalchemy.Column('action', sqlalchemy.String(255)),
sqlalchemy.Column('cause', sqlalchemy.String(255)),
sqlalchemy.Column('owner', sqlalchemy.String(36)),
sqlalchemy.Column('start_time', sqlalchemy.Float(precision='24,8')),
sqlalchemy.Column('end_time', sqlalchemy.Float(precision='24,8')),
sqlalchemy.Column('timeout', sqlalchemy.Integer),
sqlalchemy.Column('inputs', types.Dict),
sqlalchemy.Column('outputs', types.Dict),
sqlalchemy.Column('data', types.Dict),
sqlalchemy.Column('status', sqlalchemy.String(255)),
sqlalchemy.Column('status_reason', sqlalchemy.Text),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
dependency = sqlalchemy.Table(
'dependency', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('depended',
sqlalchemy.String(36),
sqlalchemy.ForeignKey('action.id'),
nullable=False),
sqlalchemy.Column('dependent',
sqlalchemy.String(36),
sqlalchemy.ForeignKey('action.id'),
nullable=False),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
user_lock = sqlalchemy.Table(
'user_lock', meta,
sqlalchemy.Column('user_id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('engine_id', sqlalchemy.String(36)),
sqlalchemy.Column('action_id', sqlalchemy.String(36)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
service = sqlalchemy.Table(
'service', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('host', sqlalchemy.String(255)),
sqlalchemy.Column('binary', sqlalchemy.String(255)),
sqlalchemy.Column('topic', sqlalchemy.String(255)),
sqlalchemy.Column('disabled', sqlalchemy.Boolean),
sqlalchemy.Column('disabled_reason', sqlalchemy.String(255)),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
@ -124,7 +179,10 @@ def upgrade(migrate_engine):
rule,
resource,
event,
action,
dependency,
user_lock,
service,
)
for index, table in enumerate(tables):

View File

@ -87,17 +87,8 @@ class SoftDelete(object):
class StateAware(object):
status = sqlalchemy.Column('status', sqlalchemy.String(10))
_status_reason = sqlalchemy.Column('status_reason', sqlalchemy.String(255))
@property
def status_reason(self):
return self._status_reason
@status_reason.setter
def status_reason(self, reason):
self._status_reason = reason and reason[:255] or ''
status = sqlalchemy.Column('status', sqlalchemy.String(255))
status_reason = sqlalchemy.Column('status_reason', sqlalchemy.Text)
class User(BASE, BileanBase, SoftDelete, StateAware, models.TimestampMixin):
@ -146,8 +137,7 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin):
"""Represents a meta resource with rate"""
__tablename__ = 'resource'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
nullable=False)
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True)
user_id = sqlalchemy.Column(
sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
@ -162,13 +152,47 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin):
rate = sqlalchemy.Column(sqlalchemy.Float, nullable=False)
class Action(BASE, BileanBase, StateAware, models.TimestampMixin):
"""Action objects."""
__tablename__ = 'action'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
name = sqlalchemy.Column(sqlalchemy.String(63))
context = sqlalchemy.Column(types.Dict)
target = sqlalchemy.Column(sqlalchemy.String(36))
action = sqlalchemy.Column(sqlalchemy.String(255))
cause = sqlalchemy.Column(sqlalchemy.String(255))
owner = sqlalchemy.Column(sqlalchemy.String(36))
start_time = sqlalchemy.Column(sqlalchemy.Float(precision='24,8'))
end_time = sqlalchemy.Column(sqlalchemy.Float(precision='24,8'))
timeout = sqlalchemy.Column(sqlalchemy.Integer)
inputs = sqlalchemy.Column(types.Dict)
outputs = sqlalchemy.Column(types.Dict)
data = sqlalchemy.Column(types.Dict)
class ActionDependency(BASE, BileanBase):
"""Action dependencies."""
__tablename__ = 'dependency'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
depended = sqlalchemy.Column(sqlalchemy.String(36),
sqlalchemy.ForeignKey('action.id'),
nullable=False)
dependent = sqlalchemy.Column(sqlalchemy.String(36),
sqlalchemy.ForeignKey('action.id'),
nullable=False)
class Event(BASE, BileanBase, SoftDelete):
"""Represents an event generated by the bilean engine."""
__tablename__ = 'event'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4(), unique=True)
default=lambda: UUID4())
user_id = sqlalchemy.Column(sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
nullable=False)
@ -184,11 +208,10 @@ class Job(BASE, BileanBase):
__tablename__ = 'job'
id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True,
unique=True)
id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True)
scheduler_id = sqlalchemy.Column(sqlalchemy.String(36))
job_type = sqlalchemy.Column(sqlalchemy.String(10))
parameters = sqlalchemy.Column(types.Dict())
parameters = sqlalchemy.Column(types.Dict)
class UserLock(BASE, BileanBase):
@ -196,6 +219,19 @@ class UserLock(BASE, BileanBase):
__tablename__ = 'user_lock'
user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
nullable=False)
engine_id = sqlalchemy.Column(sqlalchemy.String(36))
user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True)
action_id = sqlalchemy.Column(sqlalchemy.String(36))
class Service(BASE, BileanBase, models.TimestampMixin):
"""Service registry."""
__tablename__ = 'service'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
host = sqlalchemy.Column(sqlalchemy.String(255))
binary = sqlalchemy.Column(sqlalchemy.String(255))
topic = sqlalchemy.Column(sqlalchemy.String(255))
disabled = sqlalchemy.Column(sqlalchemy.Boolean, default=False)
disabled_reason = sqlalchemy.Column(sqlalchemy.String(255))

View File

View File

@ -0,0 +1,398 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from bilean.common import context as req_context
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.db import api as db_api
wallclock = time.time
LOG = logging.getLogger(__name__)
# Action causes
CAUSES = (
CAUSE_RPC, CAUSE_DERIVED,
) = (
'RPC Request',
'Derived Action',
)
class Action(object):
'''An action can be performed on a user, rule or policy.'''
RETURNS = (
RES_OK, RES_ERROR, RES_RETRY, RES_CANCEL, RES_TIMEOUT,
) = (
'OK', 'ERROR', 'RETRY', 'CANCEL', 'TIMEOUT',
)
# Action status definitions:
# INIT: Not ready to be executed because fields are being modified,
# or dependency with other actions are being analyzed.
# READY: Initialized and ready to be executed by a worker.
# RUNNING: Being executed by a worker thread.
# SUCCEEDED: Completed with success.
# FAILED: Completed with failure.
# CANCELLED: Action cancelled because worker thread was cancelled.
STATUSES = (
INIT, WAITING, READY, RUNNING, SUSPENDED,
SUCCEEDED, FAILED, CANCELLED
) = (
'INIT', 'WAITING', 'READY', 'RUNNING', 'SUSPENDED',
'SUCCEEDED', 'FAILED', 'CANCELLED',
)
# Signal commands
COMMANDS = (
SIG_CANCEL, SIG_SUSPEND, SIG_RESUME,
) = (
'CANCEL', 'SUSPEND', 'RESUME',
)
def __new__(cls, target, action, context, **kwargs):
if (cls != Action):
return super(Action, cls).__new__(cls)
target_type = action.split('_')[0]
if target_type == 'USER':
from bilean.engine.actions import user_action
ActionClass = user_action.UserAction
# elif target_type == 'RULE':
# from bilean.engine.actions import rule_action
# ActionClass = rule_action.RuleAction
# elif target_type == 'POLICY':
# from bilean.engine.actions import policy_action
# ActionClass = policy_action.PolicyAction
return super(Action, cls).__new__(ActionClass)
def __init__(self, target, action, context, **kwargs):
# context will be persisted into database so that any worker thread
# can pick the action up and execute it on behalf of the initiator
self.id = kwargs.get('id', None)
self.name = kwargs.get('name', '')
self.context = context
self.action = action
self.target = target
# Why this action is fired, it can be a UUID of another action
self.cause = kwargs.get('cause', '')
# Owner can be an UUID format ID for the worker that is currently
# working on the action. It also serves as a lock.
self.owner = kwargs.get('owner', None)
self.start_time = kwargs.get('start_time', None)
self.end_time = kwargs.get('end_time', None)
# Timeout is a placeholder in case some actions may linger too long
self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout)
# Return code, useful when action is not automatically deleted
# after execution
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', '')
# All parameters are passed in using keyword arguments which is
# a dictionary stored as JSON in DB
self.inputs = kwargs.get('inputs', {})
self.outputs = kwargs.get('outputs', {})
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None)
self.data = kwargs.get('data', {})
def store(self, context):
"""Store the action record into database table.
:param context: An instance of the request context.
:return: The ID of the stored object.
"""
timestamp = timeutils.utcnow()
values = {
'name': self.name,
'context': self.context.to_dict(),
'target': self.target,
'action': self.action,
'cause': self.cause,
'owner': self.owner,
'start_time': self.start_time,
'end_time': self.end_time,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'inputs': self.inputs,
'outputs': self.outputs,
'created_at': self.created_at,
'updated_at': self.updated_at,
'data': self.data,
}
if self.id:
self.updated_at = timestamp
values['updated_at'] = timestamp
db_api.action_update(context, self.id, values)
else:
self.created_at = timestamp
values['created_at'] = timestamp
action = db_api.action_create(context, values)
self.id = action.id
return self.id
@classmethod
def _from_db_record(cls, record):
"""Construct a action object from database record.
:param context: the context used for DB operations;
:param record: a DB action object that contains all fields.
:return: An `Action` object deserialized from the DB action object.
"""
context = req_context.RequestContext.from_dict(record.context)
kwargs = {
'id': record.id,
'name': record.name,
'cause': record.cause,
'owner': record.owner,
'start_time': record.start_time,
'end_time': record.end_time,
'timeout': record.timeout,
'status': record.status,
'status_reason': record.status_reason,
'inputs': record.inputs or {},
'outputs': record.outputs or {},
'created_at': record.created_at,
'updated_at': record.updated_at,
'data': record.data,
}
return cls(record.target, record.action, context, **kwargs)
@classmethod
def load(cls, context, action_id=None, db_action=None):
"""Retrieve an action from database.
:param context: Instance of request context.
:param action_id: An UUID for the action to deserialize.
:param db_action: An action object for the action to deserialize.
:return: A `Action` object instance.
"""
if db_action is None:
db_action = db_api.action_get(context, action_id)
if db_action is None:
raise exception.ActionNotFound(action=action_id)
return cls._from_db_record(db_action)
@classmethod
def load_all(cls, context, filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None):
"""Retrieve all actions from database."""
records = db_api.action_get_all(context, filters=filters,
limit=limit, marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir)
for record in records:
yield cls._from_db_record(record)
@classmethod
def create(cls, context, target, action, **kwargs):
"""Create an action object.
:param context: The requesting context.
:param target: The ID of the target.
:param action: Name of the action.
:param dict kwargs: Other keyword arguments for the action.
:return: ID of the action created.
"""
params = {
'user': context.user,
'project': context.project,
'domain': context.domain,
'is_admin': context.is_admin,
'request_id': context.request_id,
'trusts': context.trusts,
}
ctx = req_context.RequestContext.from_dict(params)
obj = cls(target, action, ctx, **kwargs)
return obj.store(context)
@classmethod
def delete(cls, context, action_id):
"""Delete an action from database."""
db_api.action_delete(context, action_id)
def signal(self, cmd):
'''Send a signal to the action.'''
if cmd not in self.COMMANDS:
return
if cmd == self.SIG_CANCEL:
expected_statuses = (self.INIT, self.WAITING, self.READY,
self.RUNNING)
elif cmd == self.SIG_SUSPEND:
expected_statuses = (self.RUNNING)
else: # SIG_RESUME
expected_statuses = (self.SUSPENDED)
if self.status not in expected_statuses:
msg = _LE("Action (%(action)s) is in unexpected status "
"(%(actual)s) while expected status should be one of "
"(%(expected)s).") % dict(action=self.id,
expected=expected_statuses,
actual=self.status)
LOG.error(msg)
return
db_api.action_signal(self.context, self.id, cmd)
def execute(self, **kwargs):
'''Execute the action.
In theory, the action encapsulates all information needed for
execution. 'kwargs' may specify additional parameters.
:param kwargs: additional parameters that may override the default
properties stored in the action record.
'''
return NotImplemented
def set_status(self, result, reason=None):
"""Set action status based on return value from execute."""
timestamp = wallclock()
if result == self.RES_OK:
status = self.SUCCEEDED
db_api.action_mark_succeeded(self.context, self.id, timestamp)
elif result == self.RES_ERROR:
status = self.FAILED
db_api.action_mark_failed(self.context, self.id, timestamp,
reason=reason or 'ERROR')
elif result == self.RES_TIMEOUT:
status = self.FAILED
db_api.action_mark_failed(self.context, self.id, timestamp,
reason=reason or 'TIMEOUT')
elif result == self.RES_CANCEL:
status = self.CANCELLED
db_api.action_mark_cancelled(self.context, self.id, timestamp)
else: # result == self.RES_RETRY:
status = self.READY
# Action failed at the moment, but can be retried
# We abandon it and then notify other dispatchers to execute it
db_api.action_abandon(self.context, self.id)
self.status = status
self.status_reason = reason
def get_status(self):
timestamp = wallclock()
status = db_api.action_check_status(self.context, self.id, timestamp)
self.status = status
return status
def is_timeout(self):
time_lapse = wallclock() - self.start_time
return time_lapse > self.timeout
def _check_signal(self):
# Check timeout first, if true, return timeout message
if self.timeout is not None and self.is_timeout():
return self.RES_TIMEOUT
result = db_api.action_signal_query(self.context, self.id)
return result
def is_cancelled(self):
return self._check_signal() == self.SIG_CANCEL
def is_suspended(self):
return self._check_signal() == self.SIG_SUSPEND
def is_resumed(self):
return self._check_signal() == self.SIG_RESUME
def to_dict(self):
if self.id:
dep_on = db_api.dependency_get_depended(self.context, self.id)
dep_by = db_api.dependency_get_dependents(self.context, self.id)
else:
dep_on = []
dep_by = []
action_dict = {
'id': self.id,
'name': self.name,
'action': self.action,
'target': self.target,
'cause': self.cause,
'owner': self.owner,
'start_time': self.start_time,
'end_time': self.end_time,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'inputs': self.inputs,
'outputs': self.outputs,
'depends_on': dep_on,
'depended_by': dep_by,
'created_at': self.created_at,
'updated_at': self.updated_at,
'data': self.data,
}
return action_dict
def ActionProc(context, action_id):
'''Action process.'''
action = Action.load(context, action_id=action_id)
if action is None:
LOG.error(_LE('Action "%s" could not be found.'), action_id)
return False
reason = 'Action completed'
success = True
try:
result, reason = action.execute()
except Exception as ex:
result = action.RES_ERROR
reason = six.text_type(ex)
LOG.exception(_('Unexpected exception occurred during action '
'%(action)s (%(id)s) execution: %(reason)s'),
{'action': action.action, 'id': action.id,
'reason': reason})
success = False
finally:
# NOTE: locks on action is eventually released here by status update
action.set_status(result, reason)
return success

View File

@ -0,0 +1,153 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.engine.actions import base
from bilean.engine.flows import flow as bilean_flow
from bilean.engine import lock as bilean_lock
from bilean.resources import base as resource_base
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class UserAction(base.Action):
"""An action that can be performed on a user."""
ACTIONS = (
USER_CREATE_RESOURCE, USER_UPDATE_RESOUCE, USER_DELETE_RESOURCE,
USER_SETTLE_ACCOUNT,
) = (
'USER_CREATE_RESOURCE', 'USER_UPDATE_RESOUCE', 'USER_DELETE_RESOURCE',
'USER_SETTLE_ACCOUNT',
)
def do_create_resource(self):
resource = resource_base.Resource.from_dict(self.inputs)
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'create')
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
LOG.error(_LE("Faied to execute action(%(action_id)s), error: "
"%(error_msg)s"), {"action_id": self.id,
"error_msg": six.text_type(ex)})
return self.RES_ERROR, _('Resource creation failed.')
return self.RES_OK, _('Resource creation successfully.')
def do_update_resource(self):
try:
resource_id = self.inputs.get('id')
resource = resource_base.Resource.load(
self.context, resource_id=resource_id)
except exception.ResourceNotFound:
LOG.error(_LE('The resource(%s) trying to update not found.'),
resource_id)
return self.RES_ERROR, _('Resource not found.')
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'update')
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
LOG.error(_LE("Faied to execute action(%(action_id)s), error: "
"%(error_msg)s"), {"action_id": self.id,
"error_msg": six.text_type(ex)})
return self.RES_ERROR, _('Resource update failed.')
LOG.info(_LI('Successfully updated resource: %s'), resource.id)
return self.RES_OK, _('Resource update successfully.')
def do_delete_resource(self):
try:
resource_id = self.inputs.get('resource_id')
resource = resource_base.Resource.load(
self.context, resource_id=resource_id)
except exception.ResourceNotFound:
LOG.error(_LE('The resource(%s) trying to delete not found.'),
resource_id)
return self.RES_ERROR, _('Resource not found.')
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'delete')
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
LOG.error(_LE("Faied to execute action(%(action_id)s), error: "
"%(error_msg)s"), {"action_id": self.id,
"error_msg": six.text_type(ex)})
return self.RES_ERROR, _('Resource deletion failed.')
LOG.info(_LI('Successfully deleted resource: %s'), resource.id)
return self.RES_OK, _('Resource deletion successfully.')
def do_settle_account(self):
try:
flow_engine = bilean_flow.get_settle_account_flow(
self.context, self.target, task=self.inputs.get('task'))
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
LOG.error(_LE("Faied to execute action(%(action_id)s), error: "
"%(error_msg)s"), {"action_id": self.id,
"error_msg": six.text_type(ex)})
return self.RES_ERROR, _('Settle account failed.')
return self.RES_OK, _('Settle account successfully.')
def _execute(self):
"""Private function that finds out the handler and execute it."""
action_name = self.action.lower()
method_name = action_name.replace('user', 'do')
method = getattr(self, method_name, None)
if method is None:
reason = _('Unsupported action: %s') % self.action
return self.RES_ERROR, reason
return method()
def execute(self, **kwargs):
"""Interface function for action execution.
:param dict kwargs: Parameters provided to the action, if any.
:returns: A tuple containing the result and the related reason.
"""
try:
res = bilean_lock.user_lock_acquire(self.context, self.target,
self.id, self.owner)
if not res:
LOG.error(_LE('Failed grabbing the lock for user: %s'),
self.target)
res = self.RES_ERROR
reason = _('Failed in locking user')
else:
res, reason = self._execute()
finally:
bilean_lock.user_lock_release(self.target, self.id)
return res, reason

112
bilean/engine/dispatcher.py Normal file
View File

@ -0,0 +1,112 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as oslo_context
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from bilean.common import consts
from bilean.common.i18n import _LI
from bilean.common import messaging as rpc_messaging
LOG = logging.getLogger(__name__)
OPERATIONS = (
START_ACTION, CANCEL_ACTION, STOP
) = (
'start_action', 'cancel_action', 'stop'
)
class Dispatcher(service.Service):
'''Listen on an AMQP queue named for the engine.
Receive notification from engine services and schedule actions.
'''
def __init__(self, engine_service, topic, version, thread_group_mgr):
super(Dispatcher, self).__init__()
self.TG = thread_group_mgr
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
def start(self):
super(Dispatcher, self).start()
self.target = oslo_messaging.Target(server=self.engine_id,
topic=self.topic,
version=self.version)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
def listening(self, ctxt):
'''Respond affirmatively to confirm that engine is still alive.'''
return True
def start_action(self, ctxt, action_id=None):
self.TG.start_action(self.engine_id, action_id)
def cancel_action(self, ctxt, action_id):
'''Cancel an action.'''
self.TG.cancel_action(action_id)
def suspend_action(self, ctxt, action_id):
'''Suspend an action.'''
self.TG.suspend_action(action_id)
def resume_action(self, ctxt, action_id):
'''Resume an action.'''
self.TG.resume_action(action_id)
def stop(self):
super(Dispatcher, self).stop()
# Wait for all action threads to be finished
LOG.info(_LI("Stopping all action threads of engine %s"),
self.engine_id)
# Stop ThreadGroup gracefully
self.TG.stop(True)
LOG.info(_LI("All action threads have been finished"))
def notify(method, engine_id=None, **kwargs):
'''Send notification to dispatcher
:param method: remote method to call
:param engine_id: dispatcher to notify; None implies broadcast
'''
client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION)
if engine_id:
# Notify specific dispatcher identified by engine_id
call_context = client.prepare(
version=consts.RPC_API_VERSION,
topic=consts.ENGINE_DISPATCHER_TOPIC,
server=engine_id)
else:
# Broadcast to all disptachers
call_context = client.prepare(
version=consts.RPC_API_VERSION,
topic=consts.ENGINE_DISPATCHER_TOPIC)
try:
# We don't use ctext parameter in action progress
# actually. But since RPCClient.call needs this param,
# we use oslo current context here.
call_context.call(oslo_context.get_current(), method, **kwargs)
return True
except oslo_messaging.MessagingTimeout:
return False
def start_action(engine_id=None, **kwargs):
return notify(START_ACTION, engine_id, **kwargs)

View File

@ -113,38 +113,56 @@ class Event(object):
return evt
def record(context, user_id, action=None, seconds=0, value=0):
def record(context, user, timestamp=None, action='charge', cause_resource=None,
resource_action=None, extra_cost=0, value=0):
"""Generate events for specify user
:param context: oslo.messaging.context
:param user_id: ID of user to mark event
:param user: object user to mark event
:param action: action of event, include 'charge' and 'recharge'
:param seconds: use time length, needed when action is 'charge'
:param cause_resource: object resource which triggered the action
:param resource_action: action of resource
:param extra_cost: extra cost of the resource
:param timestamp: timestamp when event occurs
:param value: value of recharge, needed when action is 'recharge'
"""
if timestamp is None:
timestamp = timeutils.utcnow()
try:
if action == 'charge':
resources = resource_base.Resource.load_all(
context, user_id=user_id, project_safe=False)
context, user_id=user.id, project_safe=False)
seconds = (timestamp - user.last_bill).total_seconds()
res_mapping = {}
for resource in resources:
usage = resource.rate * seconds
if cause_resource and resource.id == cause_resource.id:
if resource_action == 'create':
usage = extra_cost
elif resource_action == 'update':
usage = resource.rate * seconds + extra_cost
else:
usage = resource.rate * seconds
if res_mapping.get(resource.resource_type) is None:
res_mapping[resource.resource_type] = usage
else:
res_mapping[resource.resource_type] += usage
if resource_action == 'delete':
usage = cause_resource.rate * seconds + extra_cost
if res_mapping.get(cause_resource.resource_type) is None:
res_mapping[cause_resource.resource_type] = 0
res_mapping[cause_resource.resource_type] += usage
for res_type in res_mapping.keys():
event = Event(timeutils.utcnow(),
user_id=user_id,
event = Event(timestamp,
user_id=user.id,
action=action,
resource_type=res_type,
value=res_mapping.get(res_type))
event.store(context)
elif action == 'recharge':
event = Event(timeutils.utcnow(),
user_id=user_id,
event = Event(timestamp,
user_id=user.id,
action=action,
value=value)
event.store(context)

View File

220
bilean/engine/flows/flow.py Normal file
View File

@ -0,0 +1,220 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log as logging
import taskflow.engines
from taskflow.listeners import base
from taskflow.listeners import logging as logging_listener
from taskflow.patterns import linear_flow
from taskflow import task
from taskflow.types import failure as ft
from bilean.common import exception
from bilean.common.i18n import _LE
from bilean.engine import policy as policy_mod
from bilean.engine import user as user_mod
from bilean.resources import base as resource_base
from bilean.rules import base as rule_base
from bilean import scheduler as bilean_scheduler
LOG = logging.getLogger(__name__)
class DynamicLogListener(logging_listener.DynamicLoggingListener):
"""This is used to attach to taskflow engines while they are running.
It provides a bunch of useful features that expose the actions happening
inside a taskflow engine, which can be useful for developers for debugging,
for operations folks for monitoring and tracking of the resource actions
and more...
"""
#: Exception is an excepted case, don't include traceback in log if fails.
_NO_TRACE_EXCEPTIONS = (exception.InvalidInput)
def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
logger=LOG):
super(DynamicLogListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for,
retry_listen_for=retry_listen_for,
log=logger)
def _format_failure(self, fail):
if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None:
exc_info = None
exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False))
return (exc_info, exc_details)
else:
return super(DynamicLogListener, self)._format_failure(fail)
class CreateResourceTask(task.Task):
"""Create resource and store to db."""
def execute(self, context, resource, **kwargs):
user = user_mod.User.load(context, user_id=resource.user_id)
user_policy = policy_mod.Policy.load(context, policy_id=user.policy_id)
rule = user_policy.find_rule(context, resource.resource_type)
# Update resource with rule_id and rate
resource.rule_id = rule.id
resource.rate = rule.get_price(resource)
resource.store(context)
def revert(self, context, resource, result, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when creating resource: %s"),
resource.to_dict())
return
resource.delete(context, soft_delete=False)
class UpdateResourceTask(task.Task):
"""Update resource."""
def execute(self, context, resource, values, resource_bak, **kwargs):
old_rate = resource.rate
resource.properties = values.get('properties')
rule = rule_base.Rule.load(context, rule_id=resource.rule_id)
resource.rate = rule.get_price(resource)
resource.d_rate = resource.rate - old_rate
resource.store(context)
def revert(self, context, resource, resource_bak, result, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when updating resource: %s"), resource.id)
return
# restore resource
res = resource_base.Resource.from_dict(resource_bak)
res.store(context)
class DeleteResourceTask(task.Task):
"""Delete resource from db."""
def execute(self, context, resource, **kwargs):
resource.delete(context)
def revert(self, context, resource, result, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when deleting resource: %s"), resource.id)
return
resource.deleted_at = None
resource.store(context)
class LoadUserTask(task.Task):
"""Load user from db."""
default_provides = set(['user_bak', 'user_obj'])
def execute(self, context, user_id, **kwargs):
user_obj = user_mod.User.load(context, user_id=user_id)
return {
'user_bak': user_obj.to_dict(),
'user_obj': user_obj,
}
class SettleAccountTask(task.Task):
def execute(self, context, user_obj, user_bak, task, **kwargs):
user_obj.settle_account(context, task=task)
def revert(self, context, user_bak, result, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when settling account for user: %s"),
user_bak.get('id'))
return
# Restore user
user = user_mod.User.from_dict(user_bak)
user.store(context)
class UpdateUserWithResourceTask(task.Task):
"""Update user with resource actions."""
def execute(self, context, user_obj, user_bak, resource,
resource_action, **kwargs):
user_obj.update_with_resource(context, resource,
resource_action=resource_action)
def revert(self, context, user_bak, result, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when updating user: %s"), user_bak.get('id'))
return
# Restore user
user = user_mod.User.from_dict(user_bak)
user.store(context)
class UpdateUserJobsTask(task.Task):
"""Update user jobs."""
def execute(self, user_obj, **kwargs):
res = bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user_obj.to_dict())
if not res:
LOG.error(_LE("Error when updating user jobs: %s"), user_obj.id)
raise
def get_settle_account_flow(context, user_id, task=None):
"""Constructs and returns settle account task flow."""
flow_name = user_id + '_settle_account'
flow = linear_flow.Flow(flow_name)
kwargs = {
'context': context,
'user_id': user_id,
'task': task,
}
flow.add(LoadUserTask(),
SettleAccountTask())
if task != 'freeze':
flow.add(UpdateUserJobsTask())
return taskflow.engines.load(flow, store=kwargs)
def get_flow(context, resource, resource_action):
"""Constructs and returns resource task flow."""
flow_name = resource.user_id + '_' + resource_action + '_resource'
flow = linear_flow.Flow(flow_name)
kwargs = {
'context': context,
'user_id': resource.user_id,
'resource': resource,
'resource_action': resource_action,
}
if resource_action == 'create':
flow.add(CreateResourceTask())
if resource_action == 'update':
flow.add(UpdateResourceTask())
kwargs['resource_bak'] = resource.to_dict()
elif resource_action == 'delete':
flow.add(DeleteResourceTask())
flow.add(LoadUserTask(),
UpdateUserWithResourceTask(),
UpdateUserJobsTask())
return taskflow.engines.load(flow, store=kwargs)

View File

@ -13,8 +13,12 @@
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
import time
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.db import api as db_api
CONF = cfg.CONF
@ -25,22 +29,40 @@ CONF.import_opt('lock_retry_interval', 'bilean.common.config')
LOG = logging.getLogger(__name__)
def is_engine_dead(ctx, engine_id, period_time=None):
# if engine didn't report its status for peirod_time, will consider it
# as a dead engine.
if period_time is None:
period_time = 2 * CONF.periodic_interval
engine = db_api.service_get(ctx, engine_id)
if not engine:
return True
if (timeutils.utcnow() - engine.updated_at).total_seconds() > period_time:
return True
return False
def sleep(sleep_time):
'''Interface for sleeping.'''
eventlet.sleep(sleep_time)
def user_lock_acquire(user_id, engine_id):
def user_lock_acquire(context, user_id, action_id, engine=None,
forced=False):
"""Try to lock the specified user.
:param context: the context used for DB operations;
:param user_id: ID of the user to be locked.
:param engine_id: ID of the engine which wants to lock the user.
:param action_id: ID of the action that attempts to lock the user.
:param engine: ID of the engine that attempts to lock the user.
:param forced: set to True to cancel current action that owns the lock,
if any.
:returns: True if lock is acquired, or False otherwise.
"""
user_lock = db_api.user_lock_acquire(user_id, engine_id)
if user_lock:
owner = db_api.user_lock_acquire(user_id, action_id)
if action_id == owner:
return True
retries = cfg.CONF.lock_retry_times
@ -49,18 +71,40 @@ def user_lock_acquire(user_id, engine_id):
while retries > 0:
sleep(retry_interval)
LOG.debug(_('Acquire lock for user %s again'), user_id)
user_lock = db_api.user_lock_acquire(user_id, engine_id)
if user_lock:
owner = db_api.user_lock_acquire(user_id, action_id)
if action_id == owner:
return True
retries = retries - 1
if forced:
owner = db_api.user_lock_steal(user_id, action_id)
return action_id == owner
action = db_api.action_get(context, owner)
if (action and action.owner and action.owner != engine and
is_engine_dead(context, action.owner)):
LOG.info(_LI('The user %(u)s is locked by dead action %(a)s, '
'try to steal the lock.'), {
'u': user_id,
'a': owner
})
reason = _('Engine died when executing this action.')
db_api.action_mark_failed(context, action.id, time.time(),
reason=reason)
db_api.user_lock_steal(user_id, action_id)
return True
LOG.error(_LE('User is already locked by action %(old)s, '
'action %(new)s failed grabbing the lock'),
{'old': owner, 'new': action_id})
return False
def user_lock_release(user_id, engine_id=None):
def user_lock_release(user_id, action_id):
"""Release the lock on the specified user.
:param user_id: ID of the user to be released.
:param engine_id: ID of the engine which locked the user.
:param action_id: ID of the action which locked the user.
"""
return db_api.user_lock_release(user_id, engine_id=engine_id)
return db_api.user_lock_release(user_id, action_id)

View File

@ -13,11 +13,14 @@
import functools
import six
import socket
import time
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from oslo_service import threadgroup
from bilean.common import consts
from bilean.common import context as bilean_context
@ -28,9 +31,11 @@ from bilean.common.i18n import _LI
from bilean.common import messaging as rpc_messaging
from bilean.common import schema
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine.actions import base as action_mod
from bilean.engine import dispatcher
from bilean.engine import environment
from bilean.engine import event as event_mod
from bilean.engine import lock as bilean_lock
from bilean.engine import policy as policy_mod
from bilean.engine import user as user_mod
from bilean.resources import base as resource_base
@ -53,6 +58,112 @@ def request_context(func):
return wrapped
class ThreadGroupManager(object):
'''Thread group manager.'''
def __init__(self):
super(ThreadGroupManager, self).__init__()
self.workers = {}
self.group = threadgroup.ThreadGroup()
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(cfg.CONF.periodic_interval, self._service_task)
self.db_session = bilean_context.get_admin_context()
def _service_task(self):
'''Dummy task which gets queued on the service.Service threadgroup.
Without this service.Service sees nothing running i.e has nothing to
wait() on, so the process exits.
'''
pass
def start(self, func, *args, **kwargs):
'''Run the given method in a thread.'''
return self.group.add_thread(func, *args, **kwargs)
def start_action(self, worker_id, action_id=None):
'''Run the given action in a sub-thread.
Release the action lock when the thread finishes?
:param workder_id: ID of the worker thread; we fake workers using
bilean engines at the moment.
:param action_id: ID of the action to be executed. None means the
1st ready action will be scheduled to run.
'''
def release(thread, action_id):
'''Callback function that will be passed to GreenThread.link().'''
# Remove action thread from thread list
self.workers.pop(action_id)
timestamp = time.time()
if action_id is not None:
action = db_api.action_acquire(self.db_session, action_id,
worker_id, timestamp)
else:
action = db_api.action_acquire_first_ready(self.db_session,
worker_id,
timestamp)
if not action:
return
th = self.start(action_mod.ActionProc, self.db_session, action.id)
self.workers[action.id] = th
th.link(release, action.id)
return th
def cancel_action(self, action_id):
'''Cancel an action execution progress.'''
action = action_mod.Action.load(self.db_session, action_id)
action.signal(action.SIG_CANCEL)
def suspend_action(self, action_id):
'''Suspend an action execution progress.'''
action = action_mod.Action.load(self.db_session, action_id)
action.signal(action.SIG_SUSPEND)
def resume_action(self, action_id):
'''Resume an action execution progress.'''
action = action_mod.Action.load(self.db_session, action_id)
action.signal(action.SIG_RESUME)
def add_timer(self, interval, func, *args, **kwargs):
'''Define a periodic task to be run in the thread group.
The task will be executed in a separate green thread.
Interval is from cfg.CONF.periodic_interval
'''
self.group.add_timer(interval, func, *args, **kwargs)
def stop_timers(self):
self.group.stop_timers()
def stop(self, graceful=False):
'''Stop any active threads belong to this threadgroup.'''
# Try to stop all threads gracefully
self.group.stop(graceful)
self.group.wait()
# Wait for link()ed functions (i.e. lock release)
threads = self.group.threads[:]
links_done = dict((th, False) for th in threads)
def mark_done(gt, th):
links_done[th] = True
for th in threads:
th.link(mark_done, th)
while not all(links_done.values()):
eventlet.sleep()
class EngineService(service.Service):
"""Manages the running instances from creation to destruction.
@ -68,13 +179,36 @@ class EngineService(service.Service):
super(EngineService, self).__init__()
self.host = host
self.topic = topic
self.dispatcher_topic = consts.ENGINE_DISPATCHER_TOPIC
self.engine_id = None
self.TG = None
self.target = None
self._rpc_server = None
def _init_service(self):
admin_context = bilean_context.get_admin_context()
srv = db_api.service_get_by_host_and_binary(admin_context,
self.host,
'bilean-engine')
if srv is None:
srv = db_api.service_create(admin_context,
host=self.host,
binary='bilean-engine',
topic=self.topic)
self.engine_id = srv.id
def start(self):
self.engine_id = socket.gethostname()
self._init_service()
self.TG = ThreadGroupManager()
# create a dispatcher RPC service for this engine.
self.dispatcher = dispatcher.Dispatcher(self,
self.dispatcher_topic,
consts.RPC_API_VERSION,
self.TG)
LOG.info(_LI("Starting dispatcher for engine %s"), self.engine_id)
self.dispatcher.start()
LOG.info(_LI("Starting rpc server for engine: %s"), self.engine_id)
target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
@ -84,6 +218,9 @@ class EngineService(service.Service):
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
self._rpc_server.start()
self.TG.add_timer(cfg.CONF.periodic_interval,
self.service_manage_report)
super(EngineService, self).start()
def _stop_rpc_server(self):
@ -99,8 +236,22 @@ class EngineService(service.Service):
def stop(self):
self._stop_rpc_server()
# Notify dispatcher to stop all action threads it started.
LOG.info(_LI("Stopping dispatcher for engine %s"), self.engine_id)
self.dispatcher.stop()
self.TG.stop()
super(EngineService, self).stop()
def service_manage_report(self):
admin_context = bilean_context.get_admin_context()
try:
db_api.service_update(admin_context, self.engine_id)
except Exception as ex:
LOG.error(_LE('Service %(id)s update failed: %(error)s'),
{'id': self.engine_id, 'error': six.text_type(ex)})
@request_context
def user_list(self, cnxt, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
@ -136,19 +287,12 @@ class EngineService(service.Service):
@request_context
def user_recharge(self, cnxt, user_id, value):
"""Do recharge for specify user."""
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
try:
user = user_mod.User.load(cnxt, user_id=user_id)
user.do_recharge(cnxt, value)
# As user has been updated, the billing job for the user
# should to be updated too.
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
user = user_mod.User.load(cnxt, user_id=user_id)
user.do_recharge(cnxt, value)
# As user has been updated, the billing job for the user
# should to be updated too.
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
return user.to_dict()
@ -176,14 +320,8 @@ class EngineService(service.Service):
'policy': policy_id}
raise exception.BileanBadRequest(msg=msg)
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
user.policy_id = policy_id
user.store(cnxt)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
return user.to_dict()
@request_context
@ -274,44 +412,23 @@ class EngineService(service.Service):
def resource_create(self, cnxt, resource_id, user_id, resource_type,
properties):
"""Create resource by given database
"""Create resource by given data."""
Cause new resource would update user's rate, user update and billing
would be done.
"""
resource = resource_base.Resource(resource_id, user_id, resource_type,
properties)
# Find the exact rule of resource
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
user_policy = policy_mod.Policy.load(
admin_context, policy_id=user.policy_id)
rule = user_policy.find_rule(admin_context, resource_type)
# Update resource with rule_id and rate
resource.rule_id = rule.id
resource.rate = rule.get_price(resource)
params = {
'name': 'create_resource_%s' % resource_id,
'cause': action_mod.CAUSE_RPC,
'status': action_mod.Action.READY,
'inputs': resource.to_dict(),
}
# Update user with resource
res = bilean_lock.user_lock_acquire(user.id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user.id)
return
try:
# Reload user to ensure the info is latest.
user = user_mod.User.load(admin_context, user_id=user_id)
user.update_with_resource(admin_context, resource)
resource.store(admin_context)
# As the rate of user has changed, the billing job for the user
# should change too.
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return resource.to_dict()
action_id = action_mod.Action.create(cnxt, user_id,
consts.USER_CREATE_RESOURCE,
**params)
dispatcher.start_action(action_id=action_id)
LOG.info(_LI('Resource create action queued: %s'), action_id)
@request_context
def resource_list(self, cnxt, user_id=None, limit=None, marker=None,
@ -336,59 +453,44 @@ class EngineService(service.Service):
resource = resource_base.Resource.load(cnxt, resource_id=resource_id)
return resource.to_dict()
def resource_update(self, cnxt, resource):
def resource_update(self, cnxt, user_id, resource):
"""Do resource update."""
admin_context = bilean_context.get_admin_context()
res = resource_base.Resource.load(
admin_context, resource_id=resource['id'])
old_rate = res.rate
res.properties = resource['properties']
rule = rule_base.Rule.load(admin_context, rule_id=res.rule_id)
res.rate = rule.get_price(res)
res.d_rate = res.rate - old_rate
result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id)
if not result:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
params = {
'name': 'update_resource_%s' % resource.get('id'),
'cause': action_mod.CAUSE_RPC,
'status': action_mod.Action.READY,
'inputs': resource,
}
action_id = action_mod.Action.create(cnxt, user_id,
consts.USER_UPDATE_RESOURCE,
**params)
dispatcher.start_action(action_id=action_id)
LOG.info(_LI('Resource update action queued: %s'), action_id)
def resource_delete(self, cnxt, user_id, resource_id):
"""Delete a specific resource"""
try:
user = user_mod.User.load(admin_context, res.user_id)
user.update_with_resource(admin_context, res, action='update')
res.store(admin_context)
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return True
def resource_delete(self, cnxt, resource_id):
"""Do resource delete"""
admin_context = bilean_context.get_admin_context()
try:
res = resource_base.Resource.load(
admin_context, resource_id=resource_id)
resource_base.Resource.load(cnxt, resource_id=resource_id)
except exception.ResourceNotFound:
return False
LOG.error(_LE('The resource(%s) trying to delete not found.'),
resource_id)
return
result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id)
if not result:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
try:
user = user_mod.User.load(admin_context, user_id=res.user_id)
user.update_with_resource(admin_context, res, action='delete')
params = {
'name': 'delete_resource_%s' % resource_id,
'cause': action_mod.CAUSE_RPC,
'status': action_mod.Action.READY,
'inputs': {'resource_id': resource_id},
}
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
res.delete(admin_context)
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return True
action_id = action_mod.Action.create(cnxt, user_id,
consts.USER_DELETE_RESOURCE,
**params)
dispatcher.start_action(action_id=action_id)
LOG.info(_LI('Resource delete action queued: %s'), action_id)
@request_context
def event_list(self, cnxt, user_id=None, limit=None, marker=None,
@ -553,14 +655,16 @@ class EngineService(service.Service):
policy_mod.Policy.delete(cnxt, policy_id)
def settle_account(self, cnxt, user_id, task=None):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
try:
user = user_mod.User.load(cnxt, user_id=user_id)
user.settle_account(cnxt, task=task)
finally:
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
params = {
'name': 'settle_account_%s' % user_id,
'cause': action_mod.CAUSE_RPC,
'status': action_mod.Action.READY,
'inputs': {'task': task},
}
return user.to_dict()
action_id = action_mod.Action.create(cnxt, user_id,
consts.USER_SETTLE_ACCOUNT,
**params)
self.TG.start_action(self.engine_id, action_id=action_id)
LOG.info(_LI('User settle_account action queued: %s'), action_id)

View File

@ -169,7 +169,8 @@ class User(object):
@classmethod
def from_dict(cls, values):
return cls(values.get('id'), **values)
id = values.pop('id', None)
return cls(id, **values)
def to_dict(self):
user_dict = {
@ -194,17 +195,37 @@ class User(object):
self.status_reason = reason
self.store(context)
def update_with_resource(self, context, resource, action='create'):
def update_with_resource(self, context, resource,
resource_action='create'):
'''Update user with resource'''
self._settle_account(context)
if 'create' == action:
now = timeutils.utcnow()
extra_cost = 0
if 'create' == resource_action:
d_rate = resource.rate
elif 'delete' == action:
if resource.properties.get('created_at') is not None:
created_at = timeutils.parse_strtime(
resource.properties.get('created_at'))
extra_seconds = (now - created_at).total_seconds()
extra_cost = d_rate * extra_seconds
elif 'delete' == resource_action:
d_rate = -resource.rate
elif 'update' == action:
if resource.properties.get('deleted_at') is not None:
deleted_at = timeutils.parse_strtime(
resource.properties.get('deleted_at'))
extra_seconds = (now - deleted_at).total_seconds()
extra_cost = d_rate * extra_seconds
elif 'update' == resource_action:
d_rate = resource.d_rate
if resource.properties.get('updated_at') is not None:
updated_at = timeutils.parse_strtime(
resource.properties.get('updated_at'))
extra_seconds = (now - updated_at).total_seconds()
extra_cost = d_rate * extra_seconds
self._settle_account(context, extra_cost=extra_cost,
cause_resource=resource,
resource_action=resource_action)
self._change_user_rate(context, d_rate)
self.store(context)
@ -214,20 +235,18 @@ class User(object):
new_rate = old_rate + d_rate
if old_rate == 0 and new_rate > 0:
self.last_bill = timeutils.utcnow()
if d_rate > 0 and self.status == self.FREE:
self.status = self.ACTIVE
elif d_rate < 0:
if new_rate == 0 and self.balance >= 0:
reason = _("Status change to 'FREE' because of resource "
"deleting.")
self.status = self.FREE
elif new_rate == 0 and self.balance < 0:
self.status = self.FREEZE
self.satus_reason = "Balance overdraft"
elif self.status == self.WARNING:
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' "
"because of resource deleting.")
self.status = self.ACTIVE
self.status_reason = reason
self.status_reason = reason
elif self.status == self.WARNING and not self.notify_or_not():
reason = _("Status change from 'WARNING' to 'ACTIVE' "
"because of resource deleting.")
self.status = self.ACTIVE
self.status_reason = reason
self.rate = new_rate
def do_recharge(self, context, value):
@ -240,19 +259,19 @@ class User(object):
self.status = self.FREE
self.status_reason = "Recharged"
elif self.status == self.FREEZE and self.balance > 0:
reason = _("Status change from 'freeze' to 'free' because "
reason = _("Status change from 'FREEZE' to 'FREE' because "
"of recharge.")
self.status = self.FREE
self.status_reason = reason
elif self.status == self.WARNING:
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' because "
reason = _("Status change from 'WARNING' to 'ACTIVE' because "
"of recharge.")
self.status = self.ACTIVE
self.status_reason = reason
self.store(context)
event_mod.record(context, self.id, action='recharge', value=value)
event_mod.record(context, self, action='recharge', value=value)
def notify_or_not(self):
'''Check if user should be notified.'''
@ -269,33 +288,25 @@ class User(object):
db_api.user_delete(context, self.id)
return True
def _settle_account(self, context):
def _settle_account(self, context, cause_resource=None,
resource_action=None, extra_cost=0):
if self.status not in [self.ACTIVE, self.WARNING]:
LOG.info(_LI("Ignore settlement action because user is in '%s' "
"status."), self.status)
return
now = timeutils.utcnow()
total_seconds = (now - self.last_bill).total_seconds()
cost = self.rate * total_seconds
if cost > 0:
self.balance -= cost
self.last_bill = now
event_mod.record(context, self.id, action='charge',
seconds=total_seconds)
def _freeze(self, context, reason=None):
'''Freeze user when balance overdraft.'''
LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"),
{'user_id': self.id, 'reason': reason})
resources = resource_base.Resource.load_all(
context, user_id=self.id, project_safe=False)
for resource in resources:
if resource.do_delete(context):
self._change_user_rate(context, -resource.rate)
cost = self.rate * total_seconds + extra_cost
self.balance -= cost
event_mod.record(context, self, timestamp=now,
cause_resource=cause_resource,
resource_action=resource_action,
extra_cost=extra_cost)
self.last_bill = now
def settle_account(self, context, task=None):
'''Settle account for user.'''
notifier = bilean_notifier.Notifier()
self._settle_account(context)
@ -306,7 +317,17 @@ class User(object):
msg = {'user': self.id, 'notification': self.status_reason}
notifier.info('billing.notify', msg)
elif task == 'freeze' and self.balance <= 0:
self._freeze(context, reason="Balance overdraft")
reason = _("Balance overdraft")
LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"),
{'user_id': self.id, 'reason': reason})
resources = resource_base.Resource.load_all(
context, user_id=self.id, project_safe=False)
for resource in resources:
resource.do_delete(context)
self.rate = 0
self.status = self.FREEZE
self.status_reason = reason
# Notify user
msg = {'user': self.id, 'notification': self.status_reason}
notifier.info('billing.notify', msg)

View File

@ -86,11 +86,15 @@ class ResourceAction(Action):
def do_update(self):
"""Update a resource"""
return self.rpc_client.resource_update(self.cnxt, self.data)
return self.rpc_client.resource_update(self.cnxt,
self.data.pop('user_id'),
self.data)
def do_delete(self):
"""Delete a resource"""
return self.rpc_client.resource_delete(self.cnxt, self.id)
return self.rpc_client.resource_delete(self.cnxt,
self.user_id,
self.id)
class UserAction(Action):

View File

@ -71,9 +71,9 @@ class Resource(object):
return self.id
def delete(self, context):
def delete(self, context, soft_delete=True):
'''Delete resource from db.'''
db_api.resource_delete(context, self.id)
db_api.resource_delete(context, self.id, soft_delete=soft_delete)
@classmethod
def _from_db_record(cls, record):
@ -123,6 +123,14 @@ class Resource(object):
return [cls._from_db_record(record) for record in records]
@classmethod
def from_dict(cls, values):
id = values.pop('id', None)
user_id = values.pop('user_id', None)
resource_type = values.pop('resource_type', None)
properties = values.pop('properties', {})
return cls(id, user_id, resource_type, properties, **values)
def to_dict(self):
resource_dict = {
'id': self.id,

View File

@ -140,12 +140,14 @@ class EngineClient(object):
resource_type=resource_type,
properties=properties))
def resource_update(self, ctxt, resource):
def resource_update(self, ctxt, user_id, resource):
return self.call(ctxt, self.make_msg('resource_update',
user_id=user_id,
resource=resource))
def resource_delete(self, ctxt, resource_id):
def resource_delete(self, ctxt, user_id, resource_id):
return self.call(ctxt, self.make_msg('resource_delete',
user_id=user_id,
resource_id=resource_id))
# events

View File

@ -94,13 +94,9 @@ class CronScheduler(object):
for job in jobs:
if self._is_exist(job.id):
continue
task_name = "_%s_task" % (job.job_type)
task = getattr(self, task_name)
LOG.info(_LI("Add job '%(job_id)s' to scheduler '%(id)s'."),
{'job_id': job.id, 'id': self.scheduler_id})
tg_type = self.CRON if job.job_type == self.DAILY else self.DAILY
self._add_job(task, job.id, trigger_type=tg_type,
params=job.parameters)
self._add_job(job.id, job.job_type, **job.parameters)
LOG.info(_LI("Initialise users from keystone."))
users = user_mod.User.init_users(admin_context)
@ -113,7 +109,7 @@ class CronScheduler(object):
continue
self._add_daily_job(user)
def _add_job(self, task, job_id, trigger_type='date', **kwargs):
def _add_job(self, job_id, task_type, **kwargs):
"""Add a job to scheduler by given data.
:param str|unicode user_id: used as job_id
@ -123,15 +119,17 @@ class CronScheduler(object):
mg_time = cfg.CONF.scheduler.misfire_grace_time
job_time_zone = cfg.CONF.scheduler.time_zone
user_id = job_id.split('-')[1]
if trigger_type == 'date':
trigger_type = self.CRON if task_type == self.DAILY else self.DATE
if trigger_type == self.DATE:
run_date = kwargs.get('run_date')
if run_date is None:
msg = "Param run_date cannot be None for trigger type 'date'."
raise exception.InvalidInput(reason=msg)
self._scheduler.add_job(task, 'date',
self._scheduler.add_job(self._task, 'date',
timezone=job_time_zone,
run_date=run_date,
args=[user_id],
args=[user_id, task_type],
id=job_id,
misfire_grace_time=mg_time)
return
@ -141,24 +139,14 @@ class CronScheduler(object):
minute = kwargs.get('minute')
if not hour or not minute:
hour, minute = self._generate_timer()
self._scheduler.add_job(task, 'cron',
self._scheduler.add_job(self._task, 'cron',
timezone=job_time_zone,
hour=hour,
minute=minute,
args=[user_id],
args=[user_id, task_type],
id=job_id,
misfire_grace_time=mg_time)
def _modify_job(self, job_id, **changes):
"""Modifies the properties of a single job.
Modifications are passed to this method as extra keyword arguments.
:param str|unicode job_id: the identifier of the job
"""
self._scheduler.modify_job(job_id, **changes)
def _remove_job(self, job_id):
"""Removes a job, preventing it from being run any more.
@ -177,40 +165,19 @@ class CronScheduler(object):
job = self._scheduler.get_job(job_id)
return job is not None
def _notify_task(self, user_id):
def _task(self, user_id, task_type):
admin_context = bilean_context.get_admin_context()
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.NOTIFY)
user_obj = user_mod.User.from_dict(user)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user_id, self.NOTIFY))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_jobs(user_obj)
def _daily_task(self, user_id):
admin_context = bilean_context.get_admin_context()
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.DAILY)
user_obj = user_mod.User.from_dict(user)
self.update_jobs(user_obj)
def _freeze_task(self, user_id):
admin_context = bilean_context.get_admin_context()
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.FREEZE)
user_obj = user_mod.User.from_dict(user)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user_id, self.FREEZE))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_jobs(user_obj)
self.rpc_client.settle_account(
admin_context, user_id, task=task_type)
if task_type != self.DAILY:
try:
db_api.job_delete(
admin_context, self._generate_job_id(user_id, task_type))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
def _add_notify_job(self, user):
if not user.rate:
if user.rate == 0:
return False
total_seconds = user.balance / user.rate
prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600
@ -219,7 +186,7 @@ class CronScheduler(object):
run_date = timeutils.utcnow() + timedelta(seconds=notify_seconds)
job_params = {'run_date': run_date}
job_id = self._generate_job_id(user.id, self.NOTIFY)
self._add_job(self._notify_task, job_id, **job_params)
self._add_job(job_id, self.NOTIFY, **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.NOTIFY,
@ -229,13 +196,14 @@ class CronScheduler(object):
db_api.job_create(admin_context, job)
def _add_freeze_job(self, user):
if not user.rate:
if user.rate == 0:
return False
total_seconds = user.balance / user.rate
LOG.info(_LI("###########Fuck user: %s"), user.to_dict())
run_date = timeutils.utcnow() + timedelta(seconds=total_seconds)
job_params = {'run_date': run_date}
job_id = self._generate_job_id(user.id, self.FREEZE)
self._add_job(self._freeze_task, job_id, **job_params)
self._add_job(job_id, self.FREEZE, **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.FREEZE,
@ -249,15 +217,7 @@ class CronScheduler(object):
job_id = self._generate_job_id(user.id, self.DAILY)
job_params = {'hour': random.randint(0, 23),
'minute': random.randint(0, 59)}
self._add_job(self._daily_task, job_id,
trigger_type='cron', **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.DAILY,
'scheduler_id': self.scheduler_id,
'parameters': job_params}
admin_context = bilean_context.get_admin_context()
db_api.job_create(admin_context, job)
self._add_job(job_id, self.DAILY, **job_params)
return True
def _generate_timer(self):