Separate consumption and recharge from event

Change-Id: I77d8ae8952f8a436105277f13ca469ea7130aa97
This commit is contained in:
lvdongbing 2016-04-11 05:33:00 -04:00
parent 2c9d918525
commit f0d0a1403a
17 changed files with 861 additions and 266 deletions

View File

@ -75,10 +75,10 @@ USER_KEYS = (
RESOURCE_KEYS = (
RES_ID, RES_USER_ID, RES_RULE_ID, RES_RESOURCE_TYPE, RES_PROPERTIES,
RES_RATE, RES_CREATED_AT, RES_UPDATED_AT, RES_DELETED_AT,
RES_RATE, RES_LAST_BILL, RES_CREATED_AT, RES_UPDATED_AT, RES_DELETED_AT,
) = (
'id', 'user_id', 'rule_id', 'resource_type', 'properties',
'rate', 'created_at', 'updated_at', 'deleted_at',
'rate', 'last_bill', 'created_at', 'updated_at', 'deleted_at',
)
RULE_KEYS = (
@ -90,11 +90,12 @@ RULE_KEYS = (
)
EVENT_KEYS = (
EVENT_ID, EVENT_USER_ID, EVENT_ACTION, EVENT_TIMESTAMP,
EVENT_RESOURCE_TYPE, EVENT_VALUE, EVENT_DELETED_AT,
EVENT_ID, EVENT_TIMESTAMP, EVENT_OBJ_ID, EVENT_OBJ_TYPE, EVENT_ACTION,
EVENT_USER_ID, EVENT_LEVEL, EVENT_STATUS, EVENT_STATUS_REASON,
EVENT_METADATA,
) = (
'id', 'user_id', 'action', 'timestamp',
'resource_type', 'value', 'deleted_at',
'id', 'timestamp', 'obj_id', 'obj_type', 'action',
'user_id', 'level', 'status', 'status_reason', 'metadata',
)
POLICY_KEYS = (
@ -104,3 +105,26 @@ POLICY_KEYS = (
'id', 'name', 'is_default', 'rules', 'metadata',
'created_at', 'updated_at', 'deleted_at',
)
CONSUMPTION_KEYS = (
CONSUMPTION_ID, CONSUMPTION_USER_ID, CONSUMPTION_RESOURCE_ID,
CONSUMPTION_RESOURCE_TYPE, CONSUMPTION_START_TIME, CONSUMPTION_END_TIME,
CONSUMPTION_RATE, CONSUMPTION_COST, CONSUMPTION_METADATA,
) = (
'id', 'user_id', 'resource_id',
'resource_type', 'start_time', 'end_time',
'rate', 'cost', 'metadata',
)
RECHARGE_KEYS = (
RECHARGE_ID, RECHARGE_USER_ID, RECHARGE_TYPE, RECHARGE_TIMESTAMP,
RECHARGE_METADATA,
) = (
'id', 'user_id', 'type', 'timestamp', 'metadata',
)
RECHARGE_TYPES = (
SELF_RECHARGE, SYSTEM_BONUS,
) = (
'Recharge', 'System bonus',
)

View File

@ -123,7 +123,7 @@ def get_service_context(set_project_id=False, **kwargs):
if set_project_id:
project = identity_service().conn.session.get_project_id()
service_creds.update(project=project)
return RequestContext(**service_creds)
return RequestContext(is_admin=True, **service_creds)
def get_admin_context(show_deleted=False):

View File

@ -120,6 +120,10 @@ class PolicyNotFound(BileanException):
msg_fmt = _("The policy (%(policy)s) could not be found.")
class MultipleDefaultPolicy(BileanException):
msg_fmt = _("More than one default policies found.")
class UserNotFound(BileanException):
msg_fmt = _("The user (%(user)s) could not be found.")
@ -159,6 +163,10 @@ class EventNotFound(BileanException):
msg_fmt = _("The event (%(event)s) could not be found.")
class ConsumptionNotFound(BileanException):
msg_fmt = _("The consumption (%(consumption)s) could not be found.")
class InvalidResource(BileanException):
msg_fmt = _("%(msg)")

View File

@ -129,16 +129,14 @@ def event_get(context, event_id, project_safe=True):
return IMPL.event_get(context, event_id, project_safe=project_safe)
def event_get_all(context, user_id=None, show_deleted=False,
filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, project_safe=True,
start_time=None, end_time=None):
return IMPL.event_get_all(context, user_id=user_id,
show_deleted=show_deleted,
filters=filters, limit=limit,
marker=marker, sort_keys=sort_keys,
sort_dir=sort_dir, project_safe=project_safe,
start_time=start_time, end_time=end_time)
def event_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
return IMPL.event_get_all(context, limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
def event_create(context, values):
@ -306,3 +304,42 @@ def service_get_by_host_and_binary(context, host, binary):
def service_get_all(context):
return IMPL.service_get_all(context)
# consumptions
def consumption_get(context, consumption_id, project_safe=True):
return IMPL.consumption_get(context, consumption_id,
project_safe=project_safe)
def consumption_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
return IMPL.consumption_get_all(context, limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
def consumption_create(context, values):
return IMPL.consumption_create(context, values)
# recharges
def recharge_create(context, values):
return IMPL.recharge_create(context, values)
def recharge_get(context, recharge_id, project_safe=True):
return IMPL.recharge_get(context, recharge_id, project_safe=project_safe)
def recharge_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
return IMPL.recharge_get_all(context, limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)

View File

@ -351,32 +351,22 @@ def event_get(context, event_id, project_safe=True):
return event
def event_get_all(context, user_id=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None,
start_time=None, end_time=None, project_safe=True,
show_deleted=False):
query = soft_delete_aware_query(context, models.Event,
show_deleted=show_deleted)
def event_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
query = model_query(context, models.Event)
if context.is_admin:
project_safe = False
if project_safe:
query = query.filter_by(user_id=context.project)
elif user_id:
query = query.filter_by(user_id=user_id)
if start_time:
query = query.filter_by(models.Event.timestamp >= start_time)
if end_time:
query = query.filter_by(models.Event.timestamp <= end_time)
if filters is None:
filters = {}
sort_key_map = {
consts.EVENT_ACTION: models.Event.action.key,
consts.EVENT_RESOURCE_TYPE: models.Event.resource_type.key,
consts.EVENT_LEVEL: models.Event.level.key,
consts.EVENT_TIMESTAMP: models.Event.timestamp.key,
consts.EVENT_USER_ID: models.Event.user_id.key,
consts.EVENT_STATUS: models.Event.status.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Event, filters)
@ -875,3 +865,99 @@ def service_get_by_host_and_binary(context, host, binary):
def service_get_all(context):
return model_query(context, models.Service).all()
# consumptions
def consumption_get(context, consumption_id, project_safe=True):
query = model_query(context, models.Consumption)
consumption = query.get(consumption_id)
if consumption is None:
return None
if project_safe and context.project != consumption.user_id:
return None
return consumption
def consumption_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
query = model_query(context, models.Consumption)
if context.is_admin:
project_safe = False
if project_safe:
query = query.filter_by(user_id=context.project)
if filters is None:
filters = {}
sort_key_map = {
consts.CONSUMPTION_USER_ID: models.Consumption.user_id.key,
consts.CONSUMPTION_RESOURCE_TYPE: models.Consumption.resource_type.key,
consts.CONSUMPTION_START_TIME: models.Consumption.start_time.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Consumption, filters)
return _paginate_query(context, query, models.Consumption,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()
def consumption_create(context, values):
consumption_ref = models.Consumption()
consumption_ref.update(values)
consumption_ref.save(_session(context))
return consumption_ref
def consumption_delete(context, consumption_id):
session = _session(context)
session.query(models.Consumption).filter_by(
id=consumption_id).delete(synchronize_session='fetch')
# recharges
def recharge_create(context, values):
recharge_ref = models.Recharge()
recharge_ref.update(values)
recharge_ref.save(_session(context))
return recharge_ref
def recharge_get(context, recharge_id, project_safe=True):
query = model_query(context, models.Recharge)
recharge = query.get(recharge_id)
if recharge is None:
return None
if project_safe and context.project != recharge.user_id:
return None
return recharge
def recharge_get_all(context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
query = model_query(context, models.Recharge)
if context.is_admin:
project_safe = False
if project_safe:
query = query.filter_by(user_id=context.project)
if filters is None:
filters = {}
sort_key_map = {
consts.RECHARGE_USER_ID: models.Recharge.user_id.key,
consts.RECHARGE_TYPE: models.Recharge.type.key,
consts.RECHARGE_TIMESTAMP: models.Recharge.timestamp.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Recharge, filters)
return _paginate_query(context, query, models.Recharge,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['id']).all()

View File

@ -80,12 +80,10 @@ def upgrade(migrate_engine):
sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
nullable=False),
sqlalchemy.Column('rule_id',
sqlalchemy.String(36),
sqlalchemy.ForeignKey('rule.id'),
nullable=False),
sqlalchemy.Column('rule_id', sqlalchemy.String(36), nullable=False),
sqlalchemy.Column('resource_type', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('last_bill', sqlalchemy.DateTime),
sqlalchemy.Column('properties', types.Dict),
sqlalchemy.Column('rate', sqlalchemy.Float, nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
@ -99,13 +97,45 @@ def upgrade(migrate_engine):
'event', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('user_id', sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'), nullable=False),
sqlalchemy.Column('timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('resource_type', sqlalchemy.String(36)),
sqlalchemy.Column('obj_id', sqlalchemy.String(36)),
sqlalchemy.Column('obj_type', sqlalchemy.String(36)),
sqlalchemy.Column('obj_name', sqlalchemy.String(255)),
sqlalchemy.Column('action', sqlalchemy.String(36)),
sqlalchemy.Column('user_id', sqlalchemy.String(36)),
sqlalchemy.Column('level', sqlalchemy.Integer),
sqlalchemy.Column('status', sqlalchemy.String(255)),
sqlalchemy.Column('status_reason', sqlalchemy.Text),
sqlalchemy.Column('meta_data', types.Dict),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
consumption = sqlalchemy.Table(
'consumption', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('user_id', sqlalchemy.String(36)),
sqlalchemy.Column('resource_id', sqlalchemy.String(36)),
sqlalchemy.Column('resource_type', sqlalchemy.String(255)),
sqlalchemy.Column('start_time', sqlalchemy.DateTime),
sqlalchemy.Column('end_time', sqlalchemy.DateTime),
sqlalchemy.Column('rate', sqlalchemy.Float),
sqlalchemy.Column('cost', sqlalchemy.Float),
sqlalchemy.Column('meta_data', types.Dict),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
recharge = sqlalchemy.Table(
'recharge', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('user_id', sqlalchemy.String(36)),
sqlalchemy.Column('type', sqlalchemy.String(255)),
sqlalchemy.Column('timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('value', sqlalchemy.Float),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('meta_data', types.Dict),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
@ -180,6 +210,8 @@ def upgrade(migrate_engine):
rule,
resource,
event,
consumption,
recharge,
action,
dependency,
user_lock,

View File

@ -143,14 +143,12 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin):
sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
nullable=False)
rule_id = sqlalchemy.Column(
sqlalchemy.String(36),
sqlalchemy.ForeignKey('rule.id'),
nullable=True)
rule_id = sqlalchemy.Column(sqlalchemy.String(36), nullable=True)
user = relationship(User, backref=backref('resources'))
resource_type = sqlalchemy.Column(sqlalchemy.String(36), nullable=False)
properties = sqlalchemy.Column(types.Dict)
rate = sqlalchemy.Column(sqlalchemy.Float, nullable=False)
last_bill = sqlalchemy.Column(sqlalchemy.DateTime)
properties = sqlalchemy.Column(types.Dict)
class Action(BASE, BileanBase, StateAware, models.TimestampMixin):
@ -187,21 +185,52 @@ class ActionDependency(BASE, BileanBase):
nullable=False)
class Event(BASE, BileanBase, SoftDelete):
class Event(BASE, BileanBase, StateAware):
"""Represents an event generated by the bilean engine."""
__tablename__ = 'event'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
user_id = sqlalchemy.Column(sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
nullable=False)
user = relationship(User, backref=backref('events'))
action = sqlalchemy.Column(sqlalchemy.String(36))
timestamp = sqlalchemy.Column(sqlalchemy.DateTime)
resource_type = sqlalchemy.Column(sqlalchemy.String(36))
obj_id = sqlalchemy.Column(sqlalchemy.String(36))
obj_type = sqlalchemy.Column(sqlalchemy.String(36))
obj_name = sqlalchemy.Column(sqlalchemy.String(255))
action = sqlalchemy.Column(sqlalchemy.String(36))
user_id = sqlalchemy.Column(sqlalchemy.String(36))
level = sqlalchemy.Column(sqlalchemy.Integer)
meta_data = sqlalchemy.Column(types.Dict)
class Consumption(BASE, BileanBase):
"""Consumption objects."""
__tablename__ = 'consumption'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
user_id = sqlalchemy.Column(sqlalchemy.String(36))
resource_id = sqlalchemy.Column(sqlalchemy.String(36))
resource_type = sqlalchemy.Column(sqlalchemy.String(255))
start_time = sqlalchemy.Column(sqlalchemy.DateTime)
end_time = sqlalchemy.Column(sqlalchemy.DateTime)
rate = sqlalchemy.Column(sqlalchemy.Float)
cost = sqlalchemy.Column(sqlalchemy.Float)
meta_data = sqlalchemy.Column(types.Dict)
class Recharge(BASE, BileanBase):
"""Recharge history."""
__tablename__ = 'recharge'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: UUID4())
user_id = sqlalchemy.Column(sqlalchemy.String(36))
type = sqlalchemy.Column(sqlalchemy.String(255))
timestamp = sqlalchemy.Column(sqlalchemy.DateTime)
value = sqlalchemy.Column(sqlalchemy.Float)
meta_data = sqlalchemy.Column(types.Dict)
class Job(BASE, BileanBase):

View File

@ -22,6 +22,7 @@ from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.db import api as db_api
from bilean.engine import event as EVENT
wallclock = time.time
LOG = logging.getLogger(__name__)
@ -262,12 +263,12 @@ class Action(object):
expected_statuses = (self.SUSPENDED)
if self.status not in expected_statuses:
msg = _LE("Action (%(action)s) is in unexpected status "
"(%(actual)s) while expected status should be one of "
"(%(expected)s).") % dict(action=self.id,
expected=expected_statuses,
actual=self.status)
LOG.error(msg)
reason = _("Action (%(action)s) is in unexpected status "
"(%(actual)s) while expected status should be one of "
"(%(expected)s).") % dict(action=self.id,
expected=expected_statuses,
actual=self.status)
EVENT.error(self.context, self, cmd, status_reason=reason)
return
db_api.action_signal(self.context, self.id, cmd)
@ -311,6 +312,13 @@ class Action(object):
# We abandon it and then notify other dispatchers to execute it
db_api.action_abandon(self.context, self.id)
if status == self.SUCCEEDED:
EVENT.info(self.context, self, self.action, status, reason)
elif status == self.READY:
EVENT.warning(self.context, self, self.action, status, reason)
else:
EVENT.error(self.context, self, self.action, status, reason)
self.status = status
self.status_reason = reason
@ -327,6 +335,7 @@ class Action(object):
def _check_signal(self):
# Check timeout first, if true, return timeout message
if self.timeout is not None and self.is_timeout():
EVENT.debug(self.context, self, self.action, 'TIMEOUT')
return self.RES_TIMEOUT
result = db_api.action_signal_query(self.context, self.id)

View File

@ -17,8 +17,10 @@ from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.engine.actions import base
from bilean.engine import event as EVENT
from bilean.engine.flows import flow as bilean_flow
from bilean.engine import lock as bilean_lock
from bilean.engine import user as user_mod
from bilean.resources import base as resource_base
from oslo_log import log as logging
@ -37,12 +39,28 @@ class UserAction(base.Action):
'USER_SETTLE_ACCOUNT',
)
def __init__(self, target, action, context, **kwargs):
"""Constructor for a user action object.
:param target: ID of the target user object on which the action is to
be executed.
:param action: The name of the action to be executed.
:param context: The context used for accessing the DB layer.
:param dict kwargs: Additional parameters that can be passed to the
action.
"""
super(UserAction, self).__init__(target, action, context, **kwargs)
try:
self.user = user_mod.User.load(self.context, user_id=self.target)
except Exception:
self.user = None
def do_create_resource(self):
resource = resource_base.Resource.from_dict(self.inputs)
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'create')
flow_engine = bilean_flow.get_create_resource_flow(
self.context, self.target, resource)
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
@ -55,7 +73,8 @@ class UserAction(base.Action):
def do_update_resource(self):
try:
resource_id = self.inputs.get('id')
values = self.inputs
resource_id = values.pop('id', None)
resource = resource_base.Resource.load(
self.context, resource_id=resource_id)
except exception.ResourceNotFound:
@ -64,9 +83,8 @@ class UserAction(base.Action):
return self.RES_ERROR, _('Resource not found.')
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'update')
flow_engine = bilean_flow.get_update_resource_flow(
self.context, self.target, resource, values)
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
@ -89,9 +107,8 @@ class UserAction(base.Action):
return self.RES_ERROR, _('Resource not found.')
try:
flow_engine = bilean_flow.get_flow(self.context,
resource,
'delete')
flow_engine = bilean_flow.get_delete_resource_flow(
self.context, self.target, resource)
with bilean_flow.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
except Exception as ex:
@ -126,6 +143,7 @@ class UserAction(base.Action):
if method is None:
reason = _('Unsupported action: %s') % self.action
EVENT.error(self.context, self.user, self.action, 'Failed', reason)
return self.RES_ERROR, reason
return method()

View File

@ -0,0 +1,118 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from bilean.common import exception
from bilean.common import utils
from bilean.db import api as db_api
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class Consumption(object):
"""Class reference to consumption record."""
def __init__(self, user_id, **kwargs):
self.id = kwargs.get('id')
self.user_id = user_id
self.resource_id = kwargs.get('resource_id')
self.resource_type = kwargs.get('resource_type')
self.start_time = kwargs.get('start_time')
self.end_time = kwargs.get('end_time')
self.rate = kwargs.get('rate')
self.cost = kwargs.get('cost')
self.metadata = kwargs.get('metadata')
@classmethod
def from_db_record(cls, record):
'''Construct a consumption object from a database record.'''
kwargs = {
'id': record.id,
'resource_id': record.resource_id,
'resource_type': record.resource_type,
'start_time': record.start_time,
'end_time': record.end_time,
'rate': record.rate,
'cost': record.cost,
'metadata': record.meta_data,
}
return cls(record.user_id, **kwargs)
@classmethod
def load(cls, context, db_consumption=None, consumption_id=None,
project_safe=True):
'''Retrieve a consumption record from database.'''
if db_consumption is not None:
return cls.from_db_record(db_consumption)
record = db_api.consumption_get(context, consumption_id,
project_safe=project_safe)
if record is None:
raise exception.ConsumptionNotFound(consumption=consumption_id)
return cls.from_db_record(record)
@classmethod
def load_all(cls, context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
'''Retrieve all consumptions from database.'''
records = db_api.consumption_get_all(context, limit=limit,
marker=marker,
filters=filters,
sort_keys=sort_keys,
sort_dir=sort_dir,
project_safe=project_safe)
for record in records:
yield cls.from_db_record(record)
def store(self, context):
'''Store the consumption into database and return its ID.'''
values = {
'user_id': self.user_id,
'resource_id': self.resource_id,
'resource_type': self.resource_type,
'start_time': self.start_time,
'end_time': self.end_time,
'rate': self.rate,
'cost': self.cost,
'meta_data': self.metadata,
}
consumption = db_api.consumption_create(context, values)
self.id = consumption.id
return self.id
def delete(self, context):
'''Delete consumption from database.'''
db_api.consumption_delete(context, self.id)
def to_dict(self):
consumption = {
'id': self.id,
'user_id': self.user_id,
'resource_id': self.resource_id,
'resource_type': self.resource_type,
'start_time': utils.format_time(self.start_time),
'end_time': utils.format_time(self.end_time),
'rate': self.rate,
'cost': self.cost,
'metadata': self.metadata,
}
return consumption

View File

@ -11,29 +11,52 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import logging
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LC
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.common.i18n import _LW
from bilean.common import utils
from bilean.db import api as db_api
from bilean.resources import base as resource_base
from oslo_log import log as logging
from oslo_log import log
from oslo_utils import reflection
from oslo_utils import timeutils
LOG = logging.getLogger(__name__)
LOG = log.getLogger(__name__)
class Event(object):
"""Class to deal with consumption record."""
'''capturing an interesting happening in Bilean.'''
def __init__(self, timestamp, **kwargs):
def __init__(self, timestamp, level, entity=None, **kwargs):
self.timestamp = timestamp
self.level = level
self.id = kwargs.get('id')
self.user_id = kwargs.get('user_id')
self.action = kwargs.get('action')
self.resource_type = kwargs.get('resource_type')
self.value = kwargs.get('value', 0)
self.status = kwargs.get('status')
self.status_reason = kwargs.get('status_reason')
self.obj_id = kwargs.get('obj_id')
self.obj_type = kwargs.get('obj_type')
self.obj_name = kwargs.get('obj_name')
self.metadata = kwargs.get('metadata')
cntx = kwargs.get('context')
if cntx is not None:
self.user_id = cntx.project
if entity is not None:
self.obj_id = entity.id
self.obj_name = entity.name
e_type = reflection.get_class_name(entity, fully_qualified=False)
self.obj_type = e_type.upper()
@classmethod
def from_db_record(cls, record):
@ -43,11 +66,14 @@ class Event(object):
'id': record.id,
'user_id': record.user_id,
'action': record.action,
'resource_type': record.resource_type,
'action': record.action,
'value': record.value,
'status': record.status,
'status_reason': record.status_reason,
'obj_id': record.obj_id,
'obj_type': record.obj_type,
'obj_name': record.obj_name,
'metadata': record.meta_data,
}
return cls(record.timestamp, **kwargs)
return cls(record.timestamp, record.level, **kwargs)
@classmethod
def load(cls, context, db_event=None, event_id=None, project_safe=True):
@ -62,19 +88,16 @@ class Event(object):
return cls.from_db_record(record)
@classmethod
def load_all(cls, context, user_id=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None,
start_time=None, end_time=None, project_safe=True,
show_deleted=False,):
def load_all(cls, context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, project_safe=True):
'''Retrieve all events from database.'''
records = db_api.event_get_all(context, user_id=user_id, limit=limit,
marker=marker, filters=filters,
sort_keys=sort_keys, sort_dir=sort_dir,
start_time=start_time,
end_time=end_time,
project_safe=project_safe,
show_deleted=show_deleted)
records = db_api.event_get_all(context, limit=limit,
marker=marker,
filters=filters,
sort_keys=sort_keys,
sort_dir=sort_dir,
project_safe=project_safe)
for record in records:
yield cls.from_db_record(record)
@ -83,11 +106,15 @@ class Event(object):
'''Store the event into database and return its ID.'''
values = {
'timestamp': self.timestamp,
'level': self.level,
'user_id': self.user_id,
'action': self.action,
'resource_type': self.resource_type,
'action': self.action,
'value': self.value,
'status': self.status,
'status_reason': self.status_reason,
'obj_id': self.obj_id,
'obj_type': self.obj_type,
'obj_name': self.obj_name,
'meta_data': self.metadata,
}
event = db_api.event_create(context, values)
@ -95,79 +122,92 @@ class Event(object):
return self.id
@classmethod
def from_dict(cls, **kwargs):
timestamp = kwargs.pop('timestamp')
return cls(timestamp, kwargs)
def to_dict(self):
evt = {
'id': self.id,
'level': self.level,
'user_id': self.user_id,
'action': self.action,
'resource_type': self.resource_type,
'action': self.action,
'value': self.value,
'status': self.status,
'status_reason': self.status_reason,
'obj_id': self.obj_id,
'obj_type': self.obj_type,
'obj_name': self.obj_name,
'timestamp': utils.format_time(self.timestamp),
'metadata': self.metadata,
}
return evt
def record(context, user, timestamp=None, action='charge', cause_resource=None,
resource_action=None, extra_cost=0, value=0):
"""Generate events for specify user
def critical(context, entity, action, status=None, status_reason=None,
timestamp=None):
timestamp = timestamp or timeutils.utcnow()
event = Event(timestamp, logging.CRITICAL, entity,
action=action, status=status, status_reason=status_reason,
user_id=context.project)
event.store(context)
LOG.critical(_LC('%(name)s [%(id)s] - %(status)s: %(reason)s'),
{'name': event.obj_name,
'id': event.obj_id and event.obj_id[:8],
'status': status,
'reason': status_reason})
:param context: oslo.messaging.context
:param user: object user to mark event
:param action: action of event, include 'charge' and 'recharge'
:param cause_resource: object resource which triggered the action
:param resource_action: action of resource
:param extra_cost: extra cost of the resource
:param timestamp: timestamp when event occurs
:param value: value of recharge, needed when action is 'recharge'
"""
if timestamp is None:
timestamp = timeutils.utcnow()
try:
if action == 'charge':
resources = resource_base.Resource.load_all(
context, user_id=user.id, project_safe=False)
seconds = (timestamp - user.last_bill).total_seconds()
res_mapping = {}
for resource in resources:
if cause_resource and resource.id == cause_resource.id:
if resource_action == 'create':
usage = extra_cost
elif resource_action == 'update':
usage = resource.rate * seconds + extra_cost
else:
usage = resource.rate * seconds
if res_mapping.get(resource.resource_type) is None:
res_mapping[resource.resource_type] = usage
else:
res_mapping[resource.resource_type] += usage
if resource_action == 'delete':
usage = cause_resource.rate * seconds + extra_cost
if res_mapping.get(cause_resource.resource_type) is None:
res_mapping[cause_resource.resource_type] = 0
res_mapping[cause_resource.resource_type] += usage
def error(context, entity, action, status=None, status_reason=None,
timestamp=None):
timestamp = timestamp or timeutils.utcnow()
event = Event(timestamp, logging.ERROR, entity,
action=action, status=status, status_reason=status_reason,
user_id=context.project)
event.store(context)
LOG.error(_LE('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'),
{'name': event.obj_name,
'id': event.obj_id and event.obj_id[:8],
'action': action,
'status': status,
'reason': status_reason})
for res_type in res_mapping.keys():
event = Event(timestamp,
user_id=user.id,
action=action,
resource_type=res_type,
value=res_mapping.get(res_type))
event.store(context)
elif action == 'recharge':
event = Event(timestamp,
user_id=user.id,
action=action,
value=value)
event.store(context)
else:
msg = _("Unsupported event action '%s'.") % action
raise exception.BileanException(msg=msg)
except Exception as exc:
LOG.error(_("Error generate events: %s") % six.text_type(exc))
def warning(context, entity, action, status=None, status_reason=None,
timestamp=None):
timestamp = timestamp or timeutils.utcnow()
event = Event(timestamp, logging.WARNING, entity,
action=action, status=status, status_reason=status_reason,
user_id=context.project)
event.store(context)
LOG.warning(_LW('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'),
{'name': event.obj_name,
'id': event.obj_id and event.obj_id[:8],
'action': action,
'status': status,
'reason': status_reason})
def info(context, entity, action, status=None, status_reason=None,
timestamp=None):
timestamp = timestamp or timeutils.utcnow()
event = Event(timestamp, logging.INFO, entity,
action=action, status=status, status_reason=status_reason,
user_id=context.project)
event.store(context)
LOG.info(_LI('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'),
{'name': event.obj_name,
'id': event.obj_id and event.obj_id[:8],
'action': action,
'status': status,
'reason': status_reason})
def debug(context, entity, action, status=None, status_reason=None,
timestamp=None):
timestamp = timestamp or timeutils.utcnow()
event = Event(timestamp, logging.DEBUG, entity,
action=action, status=status, status_reason=status_reason,
user_id=context.project)
event.store(context)
LOG.debug(_('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'),
{'name': event.obj_name,
'id': event.obj_id and event.obj_id[:8],
'action': action,
'status': status,
'reason': status_reason})

View File

@ -69,12 +69,16 @@ class CreateResourceTask(task.Task):
def execute(self, context, resource, **kwargs):
user = user_mod.User.load(context, user_id=resource.user_id)
user_policy = policy_mod.Policy.load(context, policy_id=user.policy_id)
rule = user_policy.find_rule(context, resource.resource_type)
try:
policy = policy_mod.Policy.load(context, policy_id=user.policy_id)
except exception.PolicyNotFound:
policy = policy_mod.Policy.load_default(context)
if policy is not None:
rule = policy.find_rule(context, resource.resource_type)
# Update resource with rule_id and rate
resource.rule_id = rule.id
resource.rate = rule.get_price(resource)
# Update resource with rule_id and rate
resource.rule_id = rule.id
resource.rate = rule.get_price(resource)
resource.store(context)
def revert(self, context, resource, result, **kwargs):
@ -94,7 +98,7 @@ class UpdateResourceTask(task.Task):
resource.properties = values.get('properties')
rule = rule_base.Rule.load(context, rule_id=resource.rule_id)
resource.rate = rule.get_price(resource)
resource.d_rate = resource.rate - old_rate
resource.delta_rate = resource.rate - old_rate
resource.store(context)
def revert(self, context, resource, resource_bak, result, **kwargs):
@ -122,6 +126,25 @@ class DeleteResourceTask(task.Task):
resource.store(context)
class CreateConsumptionTask(task.Task):
"""Generate consumption record and store to db."""
def execute(self, context, resource, *args, **kwargs):
consumption = resource.consumption
if consumption is not None:
consumption.store(context)
def revert(self, context, resource, result, *args, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when storing consumption of resource: %s"),
resource.id)
return
consumption = resource.consumption
if consumption is not None:
consumption.delete(context)
class LoadUserTask(task.Task):
"""Load user from db."""
@ -150,17 +173,18 @@ class SettleAccountTask(task.Task):
user.store(context)
class UpdateUserWithResourceTask(task.Task):
"""Update user with resource actions."""
class UpdateUserRateTask(task.Task):
"""Update user's rate ."""
def execute(self, context, user_obj, user_bak, resource,
resource_action, **kwargs):
user_obj.update_with_resource(context, resource,
resource_action=resource_action)
def execute(self, context, user_obj, user_bak, resource, *args, **kwargs):
user_obj.update_rate(context, resource.delta_rate,
timestamp=resource.last_bill,
delayed_cost=resource.delayed_cost)
def revert(self, context, user_bak, result, **kwargs):
def revert(self, context, user_obj, user_bak, resource, result,
*args, **kwargs):
if isinstance(result, ft.Failure):
LOG.error(_LE("Error when updating user: %s"), user_bak.get('id'))
LOG.error(_LE("Error when updating user: %s"), user_obj.id)
return
# Restore user
@ -196,25 +220,72 @@ def get_settle_account_flow(context, user_id, task=None):
return taskflow.engines.load(flow, store=kwargs)
def get_flow(context, resource, resource_action):
"""Constructs and returns resource task flow."""
def get_create_resource_flow(context, user_id, resource):
"""Constructs and returns user task flow.
flow_name = resource.user_id + '_' + resource_action + '_resource'
:param context: The request context.
:param user_id: The ID of user.
:param resource: Object resource to create.
"""
flow_name = user_id + '_create_resource'
flow = linear_flow.Flow(flow_name)
kwargs = {
'context': context,
'user_id': resource.user_id,
'user_id': user_id,
'resource': resource,
'resource_action': resource_action,
}
if resource_action == 'create':
flow.add(CreateResourceTask())
if resource_action == 'update':
flow.add(UpdateResourceTask())
kwargs['resource_bak'] = resource.to_dict()
elif resource_action == 'delete':
flow.add(DeleteResourceTask())
flow.add(LoadUserTask(),
UpdateUserWithResourceTask(),
flow.add(CreateResourceTask(),
LoadUserTask(),
UpdateUserRateTask(),
UpdateUserJobsTask())
return taskflow.engines.load(flow, store=kwargs)
def get_delete_resource_flow(context, user_id, resource):
"""Constructs and returns user task flow.
:param context: The request context.
:param user_id: The ID of user.
:param resource: Object resource to delete.
"""
flow_name = user_id + '_delete_resource'
flow = linear_flow.Flow(flow_name)
kwargs = {
'context': context,
'user_id': user_id,
'resource': resource,
}
flow.add(DeleteResourceTask(),
CreateConsumptionTask(),
LoadUserTask(),
UpdateUserRateTask(),
UpdateUserJobsTask())
return taskflow.engines.load(flow, store=kwargs)
def get_update_resource_flow(context, user_id, resource, values):
"""Constructs and returns user task flow.
:param context: The request context.
:param user_id: The ID of user.
:param resource: Object resource to update.
:param values: The values to update.
"""
flow_name = user_id + '_update_resource'
flow = linear_flow.Flow(flow_name)
kwargs = {
'context': context,
'user_id': user_id,
'resource': resource,
'resource_bak': resource.to_dict(),
'values': values,
}
flow.add(UpdateResourceTask(),
CreateConsumptionTask(),
LoadUserTask(),
UpdateUserRateTask(),
UpdateUserJobsTask())
return taskflow.engines.load(flow, store=kwargs)

View File

@ -83,6 +83,18 @@ class Policy(object):
return cls._from_db_record(policy)
@classmethod
def load_default(cls, context, show_deleted=False):
'''Retrieve default policy from database.'''
filters = {'is_default': True}
policies = cls.load_all(context, filters=filters,
show_deleted=show_deleted)
if len(policies) > 1:
raise exception.MultipleDefaultPolicy()
policy = None if len(policies) < 1 else policies[0]
return policy
@classmethod
def load_all(cls, context, limit=None, marker=None,
sort_keys=None, sort_dir=None,
@ -98,15 +110,6 @@ class Policy(object):
return [cls._from_db_record(record) for record in records]
def find_rule(self, context, rtype):
'''Find the exact rule from self.rules by rtype'''
for rule in self.rules:
if rtype == rule['type'].split('-')[0]:
return rule_base.Rule.load(context, rule_id=rule['id'])
raise exception.RuleNotFound(rule_type=rtype)
def to_dict(self):
policy_dict = {
'id': self.id,
@ -123,3 +126,12 @@ class Policy(object):
def do_delete(self, context):
db_api.policy_delete(context, self.id)
return True
def find_rule(self, context, rtype):
'''Find the exact rule from self.rules by rtype'''
for rule in self.rules:
if rtype == rule['type'].split('-')[0]:
return rule_base.Rule.load(context, rule_id=rule['id'])
raise exception.RuleNotFound(rule_type=rtype)

View File

@ -21,6 +21,7 @@ from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from oslo_service import threadgroup
from oslo_utils import timeutils
from bilean.common import consts
from bilean.common import context as bilean_context
@ -287,10 +288,19 @@ class EngineService(service.Service):
return user.to_dict()
@request_context
def user_recharge(self, cnxt, user_id, value):
def user_recharge(self, cnxt, user_id, value, recharge_type=None,
timestamp=None, metadata=None):
"""Do recharge for specify user."""
user = user_mod.User.load(cnxt, user_id=user_id)
user.do_recharge(cnxt, value)
try:
user = user_mod.User.load(cnxt, user_id=user_id)
except exception.UserNotFound as ex:
raise exception.BileanBadRequest(msg=six.text_type(ex))
recharge_type = recharge_type or consts.SELF_RECHARGE
timestamp = timestamp or timeutils.utcnow()
metadata = metadata or {}
user.do_recharge(cnxt, value, recharge_type=recharge_type,
timestamp=timestamp, metadata=metadata)
# As user has been updated, the billing job for the user
# should to be updated too.
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,

View File

@ -19,7 +19,6 @@ from bilean.common.i18n import _LI
from bilean.common import utils
from bilean.db import api as db_api
from bilean.drivers import base as driver_base
from bilean.engine import event as event_mod
from bilean import notifier as bilean_notifier
from bilean.resources import base as resource_base
@ -39,6 +38,8 @@ class User(object):
'INIT', 'FREE', 'ACTIVE', 'WARNING', 'FREEZE',
)
ALLOW_DELAY_TIME = 10
def __init__(self, user_id, **kwargs):
self.id = user_id
self.name = kwargs.get('name')
@ -212,66 +213,56 @@ class User(object):
self.status_reason = reason
self.store(context)
def update_with_resource(self, context, resource,
resource_action='create'):
'''Update user with resource'''
def update_rate(self, context, delta_rate, timestamp=None, delayed_cost=0):
"""Update user's rate and update user status.
now = timeutils.utcnow()
extra_cost = 0
if 'create' == resource_action:
d_rate = resource.rate
if resource.properties.get('created_at') is not None:
created_at = timeutils.parse_strtime(
resource.properties.get('created_at'))
extra_seconds = (now - created_at).total_seconds()
extra_cost = d_rate * extra_seconds
elif 'delete' == resource_action:
d_rate = -resource.rate
if resource.properties.get('deleted_at') is not None:
deleted_at = timeutils.parse_strtime(
resource.properties.get('deleted_at'))
extra_seconds = (now - deleted_at).total_seconds()
extra_cost = d_rate * extra_seconds
elif 'update' == resource_action:
d_rate = resource.d_rate
if resource.properties.get('updated_at') is not None:
updated_at = timeutils.parse_strtime(
resource.properties.get('updated_at'))
extra_seconds = (now - updated_at).total_seconds()
extra_cost = d_rate * extra_seconds
:param context: The request context.
:param delta_rate: Delta rate to change.
:param timestamp: The time that resource action occurs.
:param delayed_cost: User's action may be delayed by some reason,
adjust balance by delayed_cost.
"""
self._settle_account(context, extra_cost=extra_cost,
cause_resource=resource,
resource_action=resource_action)
self._change_user_rate(context, d_rate)
self.store(context)
if delta_rate == 0 and delayed_cost == 0:
return
# Settle account before update rate
self._settle_account(context, timestamp=timestamp,
delayed_cost=delayed_cost)
def _change_user_rate(self, context, d_rate):
# Update the rate of user
old_rate = self.rate
new_rate = old_rate + d_rate
new_rate = old_rate + delta_rate
if old_rate == 0 and new_rate > 0:
# Set last_bill when status change to 'ACTIVE' from 'FREE'
self.last_bill = timeutils.utcnow()
reason = _("Status change to 'ACTIVE' cause resource creation.")
self.status = self.ACTIVE
elif d_rate < 0:
self.status_reason = reason
elif delta_rate < 0:
if new_rate == 0 and self.balance >= 0:
reason = _("Status change to 'FREE' because of resource "
"deleting.")
"deletion.")
self.status = self.FREE
self.status_reason = reason
elif self.status == self.WARNING and not self.notify_or_not():
elif self.status == self.WARNING and not self._notify_or_not():
reason = _("Status change from 'WARNING' to 'ACTIVE' "
"because of resource deleting.")
"because of resource deletion.")
self.status = self.ACTIVE
self.status_reason = reason
self.rate = new_rate
self.store(context)
def do_recharge(self, context, value):
'''Do recharge for user.'''
if self.rate > 0 and self.status != self.FREEZE:
self._settle_account(context)
def do_recharge(self, context, value, recharge_type=None, timestamp=None,
metadata=None):
"""Recharge for user and update status.
param context: The request context.
param value: Recharge value.
param recharge_type: Rechage type, 'Recharge'|'System bonus'.
param timestamp: Record when recharge action occurs.
param metadata: Some other keyword.
"""
self.balance += value
if self.status == self.INIT and self.balance > 0:
self.status = self.FREE
self.status_reason = "Recharged"
@ -281,16 +272,22 @@ class User(object):
self.status = self.FREE
self.status_reason = reason
elif self.status == self.WARNING:
if not self.notify_or_not():
if not self._notify_or_not():
reason = _("Status change from 'WARNING' to 'ACTIVE' because "
"of recharge.")
self.status = self.ACTIVE
self.status_reason = reason
self.store(context)
event_mod.record(context, self, action='recharge', value=value)
def notify_or_not(self):
# Create recharge record
values = {'user_id': self.id,
'value': value,
'type': recharge_type,
'timestamp': timestamp,
'metadata': metadata}
db_api.recharge_create(context, values)
def _notify_or_not(self):
'''Check if user should be notified.'''
cfg.CONF.import_opt('prior_notify_time',
'bilean.scheduler.cron_scheduler',
@ -305,21 +302,22 @@ class User(object):
db_api.user_delete(context, self.id)
return True
def _settle_account(self, context, cause_resource=None,
resource_action=None, extra_cost=0):
if self.status not in [self.ACTIVE, self.WARNING]:
def _settle_account(self, context, timestamp=None, delayed_cost=0):
if self.rate == 0 and delayed_cost == 0:
LOG.info(_LI("Ignore settlement action because user is in '%s' "
"status."), self.status)
return
now = timeutils.utcnow()
total_seconds = (now - self.last_bill).total_seconds()
cost = self.rate * total_seconds + extra_cost
self.balance -= cost
event_mod.record(context, self, timestamp=now,
cause_resource=cause_resource,
resource_action=resource_action,
extra_cost=extra_cost)
self.last_bill = now
# Calculate user's cost before last_bill and now
cost = 0
if self.rate > 0 and self.last_bill:
timestamp = timestamp or timeutils.utcnow()
total_seconds = (timestamp - self.last_bill).total_seconds()
cost = self.rate * total_seconds
total_cost = cost + delayed_cost
self.balance -= total_cost
self.last_bill = timestamp
def settle_account(self, context, task=None):
'''Settle account for user.'''
@ -327,7 +325,7 @@ class User(object):
notifier = bilean_notifier.Notifier()
self._settle_account(context)
if task == 'notify' and self.notify_or_not():
if task == 'notify' and self._notify_or_not():
self.status_reason = "The balance is almost used up"
self.status = self.WARNING
# Notify user

View File

@ -14,8 +14,11 @@
from bilean.common import exception
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import consumption as consumption_mod
from bilean.engine import environment
from oslo_utils import timeutils
class Resource(object):
"""A resource is an object that refers to a physical resource.
@ -25,8 +28,16 @@ class Resource(object):
something else.
"""
ALLOW_DELAY_TIME = 10
def __new__(cls, id, user_id, res_type, properties, **kwargs):
"""Create a new resource of the appropriate class."""
"""Create a new resource of the appropriate class.
:param id: The resource ID comes same as the real resource.
:param user_id: The user ID the resource belongs to.
:param properties: The properties of resource.
:param dict kwargs: Other keyword arguments for the resource.
"""
if cls != Resource:
ResourceClass = cls
else:
@ -42,12 +53,17 @@ class Resource(object):
self.rule_id = kwargs.get('rule_id')
self.rate = kwargs.get('rate', 0)
self.d_rate = 0
self.last_bill = kwargs.get('last_bill')
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
self.deleted_at = kwargs.get('deleted_at')
# Properties pass to user to help settle account, not store to db
self.delta_rate = 0
self.delayed_cost = 0
self.consumption = None
def store(self, context):
"""Store the resource record into database table."""
@ -57,22 +73,108 @@ class Resource(object):
'properties': self.properties,
'rule_id': self.rule_id,
'rate': self.rate,
'last_bill': self.last_bill,
'created_at': self.created_at,
'updated_at': self.updated_at,
'deleted_at': self.deleted_at,
}
if self.created_at:
db_api.resource_update(context, self.id, values)
self._update(context, values)
else:
values.update(id=self.id)
resource = db_api.resource_create(context, values)
self.created_at = resource.created_at
self._create(context, values)
return self.id
def delete(self, context, soft_delete=True):
'''Delete resource from db.'''
self._delete(context, soft_delete=soft_delete)
def _create(self, context, values):
self.delta_rate = self.rate
if self.delta_rate == 0:
resource = db_api.resource_create(context, values)
self.created_at = resource.created_at
return
now = timeutils.utcnow()
self.last_bill = now
create_time = self.properties.get('created_at')
if create_time is not None:
created_at = timeutils.parse_strtime(create_time)
delayed_seconds = (now - created_at).total_seconds()
# Engine handle resource creation is delayed because of something,
# we suppose less than ALLOW_DELAY_TIME is acceptable.
if delayed_seconds > self.ALLOW_DELAY_TIME:
self.delayed_cost = self.delta_rate * delayed_seconds
self.last_bill = created_at
values.update(last_bill=self.last_bill)
resource = db_api.resource_create(context, values)
self.created_at = resource.created_at
def _update(self, context, values):
if self.delta_rate == 0:
db_api.resource_update(context, self.id, values)
return
update_time = self.properties.get('updated_at')
now = timeutils.utcnow()
updated_at = now
if update_time is not None:
updated_at = timeutils.parse_strtime(update_time)
delayed_seconds = (now - updated_at).total_seconds()
# Engine handle resource update is delayed because of something,
# we suppose less than ALLOW_DELAY_TIME is acceptable.
if delayed_seconds > self.ALLOW_DELAY_TIME:
self.delayed_cost = self.delta_rate * delayed_seconds
# Generate consumption between lass bill and update time
old_rate = self.rate - self.delta_rate
cost = (updated_at - self.last_bill).total_seconds() * old_rate
params = {'resource_id': self.id,
'resource_type': self.resource_type,
'start_time': self.last_bill,
'end_time': updated_at,
'rate': old_rate,
'cost': cost,
'metadata': {'cause': 'Resource update'}}
self.consumption = consumption_mod.Consumption(self.user_id, **params)
self.last_bill = updated_at
values.update(last_bill=updated_at)
db_api.resource_update(context, self.id, values)
def _delete(self, context, soft_delete=True):
self.delta_rate = - self.rate
if self.delta_rate == 0:
db_api.resource_delete(context, self.id, soft_delete=soft_delete)
return
delete_time = self.properties.get('deleted_at')
now = timeutils.utcnow()
deleted_at = now
if delete_time is not None:
deleted_at = timeutils.parse_strtime(delete_time)
delayed_seconds = (now - deleted_at).total_seconds()
# Engine handle resource deletion is delayed because of something,
# we suppose less than ALLOW_DELAY_TIME is acceptable.
if delayed_seconds > self.ALLOW_DELAY_TIME:
self.delayed_cost = self.delta_rate * delayed_seconds
# Generate consumption between lass bill and delete time
cost = (deleted_at - self.last_bill).total_seconds() * self.rate
params = {'resource_id': self.id,
'resource_type': self.resource_type,
'start_time': self.last_bill,
'end_time': deleted_at,
'rate': self.rate,
'cost': cost,
'metadata': {'cause': 'Resource deletion'}}
self.consumption = consumption_mod.Consumption(self.user_id, **params)
self.last_bill = deleted_at
db_api.resource_delete(context, self.id, soft_delete=soft_delete)
@classmethod
@ -84,6 +186,7 @@ class Resource(object):
kwargs = {
'rule_id': record.rule_id,
'rate': record.rate,
'last_bill': record.last_bill,
'created_at': record.created_at,
'updated_at': record.updated_at,
'deleted_at': record.deleted_at,
@ -139,6 +242,7 @@ class Resource(object):
'properties': self.properties,
'rule_id': self.rule_id,
'rate': self.rate,
'last_bill': utils.format_time(self.last_bill),
'created_at': utils.format_time(self.created_at),
'updated_at': utils.format_time(self.updated_at),
'deleted_at': utils.format_time(self.deleted_at),

View File

@ -199,7 +199,6 @@ class CronScheduler(object):
if user.rate == 0:
return False
total_seconds = user.balance / user.rate
LOG.info(_LI("###########Fuck user: %s"), user.to_dict())
run_date = timeutils.utcnow() + timedelta(seconds=total_seconds)
job_params = {'run_date': run_date}
job_id = self._generate_job_id(user.id, self.FREEZE)