292 lines
9.5 KiB
Python
292 lines
9.5 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
|
|
from oslo_log import log as logging
|
|
import taskflow.engines
|
|
from taskflow.listeners import base
|
|
from taskflow.listeners import logging as logging_listener
|
|
from taskflow.patterns import linear_flow
|
|
from taskflow import task
|
|
from taskflow.types import failure as ft
|
|
|
|
from bilean.common import exception
|
|
from bilean.common.i18n import _LE
|
|
from bilean.engine import policy as policy_mod
|
|
from bilean.engine import user as user_mod
|
|
from bilean.resources import base as resource_base
|
|
from bilean.rules import base as rule_base
|
|
from bilean import scheduler as bilean_scheduler
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class DynamicLogListener(logging_listener.DynamicLoggingListener):
|
|
"""This is used to attach to taskflow engines while they are running.
|
|
|
|
It provides a bunch of useful features that expose the actions happening
|
|
inside a taskflow engine, which can be useful for developers for debugging,
|
|
for operations folks for monitoring and tracking of the resource actions
|
|
and more...
|
|
"""
|
|
|
|
#: Exception is an excepted case, don't include traceback in log if fails.
|
|
_NO_TRACE_EXCEPTIONS = (exception.InvalidInput)
|
|
|
|
def __init__(self, engine,
|
|
task_listen_for=base.DEFAULT_LISTEN_FOR,
|
|
flow_listen_for=base.DEFAULT_LISTEN_FOR,
|
|
retry_listen_for=base.DEFAULT_LISTEN_FOR,
|
|
logger=LOG):
|
|
super(DynamicLogListener, self).__init__(
|
|
engine,
|
|
task_listen_for=task_listen_for,
|
|
flow_listen_for=flow_listen_for,
|
|
retry_listen_for=retry_listen_for,
|
|
log=logger)
|
|
|
|
def _format_failure(self, fail):
|
|
if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None:
|
|
exc_info = None
|
|
exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False))
|
|
return (exc_info, exc_details)
|
|
else:
|
|
return super(DynamicLogListener, self)._format_failure(fail)
|
|
|
|
|
|
class CreateResourceTask(task.Task):
|
|
"""Create resource and store to db."""
|
|
|
|
def execute(self, context, resource, **kwargs):
|
|
user = user_mod.User.load(context, user_id=resource.user_id)
|
|
try:
|
|
policy = policy_mod.Policy.load(context, policy_id=user.policy_id)
|
|
except exception.PolicyNotFound:
|
|
policy = policy_mod.Policy.load_default(context)
|
|
if policy is not None:
|
|
rule = policy.find_rule(context, resource.resource_type)
|
|
|
|
# Update resource with rule_id and rate
|
|
resource.rule_id = rule.id
|
|
resource.rate = rule.get_price(resource)
|
|
resource.store(context)
|
|
|
|
def revert(self, context, resource, result, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when creating resource: %s"),
|
|
resource.to_dict())
|
|
return
|
|
|
|
resource.delete(context, soft_delete=False)
|
|
|
|
|
|
class UpdateResourceTask(task.Task):
|
|
"""Update resource."""
|
|
|
|
def execute(self, context, resource, values, resource_bak, **kwargs):
|
|
old_rate = resource.rate
|
|
resource.properties = values.get('properties')
|
|
rule = rule_base.Rule.load(context, rule_id=resource.rule_id)
|
|
resource.rate = rule.get_price(resource)
|
|
resource.delta_rate = resource.rate - old_rate
|
|
resource.store(context)
|
|
|
|
def revert(self, context, resource, resource_bak, result, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when updating resource: %s"), resource.id)
|
|
return
|
|
|
|
# restore resource
|
|
res = resource_base.Resource.from_dict(resource_bak)
|
|
res.store(context)
|
|
|
|
|
|
class DeleteResourceTask(task.Task):
|
|
"""Delete resource from db."""
|
|
|
|
def execute(self, context, resource, **kwargs):
|
|
resource.delete(context)
|
|
|
|
def revert(self, context, resource, result, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when deleting resource: %s"), resource.id)
|
|
return
|
|
|
|
resource.deleted_at = None
|
|
resource.store(context)
|
|
|
|
|
|
class CreateConsumptionTask(task.Task):
|
|
"""Generate consumption record and store to db."""
|
|
|
|
def execute(self, context, resource, *args, **kwargs):
|
|
consumption = resource.consumption
|
|
if consumption is not None:
|
|
consumption.store(context)
|
|
|
|
def revert(self, context, resource, result, *args, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when storing consumption of resource: %s"),
|
|
resource.id)
|
|
return
|
|
|
|
consumption = resource.consumption
|
|
if consumption is not None:
|
|
consumption.delete(context)
|
|
|
|
|
|
class LoadUserTask(task.Task):
|
|
"""Load user from db."""
|
|
|
|
default_provides = set(['user_bak', 'user_obj'])
|
|
|
|
def execute(self, context, user_id, **kwargs):
|
|
user_obj = user_mod.User.load(context, user_id=user_id)
|
|
return {
|
|
'user_bak': user_obj.to_dict(),
|
|
'user_obj': user_obj,
|
|
}
|
|
|
|
|
|
class SettleAccountTask(task.Task):
|
|
def execute(self, context, user_obj, user_bak, task, **kwargs):
|
|
user_obj.settle_account(context, task=task)
|
|
|
|
def revert(self, context, user_bak, result, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when settling account for user: %s"),
|
|
user_bak.get('id'))
|
|
return
|
|
|
|
# Restore user
|
|
user = user_mod.User.from_dict(user_bak)
|
|
user.store(context)
|
|
|
|
|
|
class UpdateUserRateTask(task.Task):
|
|
"""Update user's rate ."""
|
|
|
|
def execute(self, context, user_obj, user_bak, resource, *args, **kwargs):
|
|
user_obj.update_rate(context, resource.delta_rate,
|
|
timestamp=resource.last_bill,
|
|
delayed_cost=resource.delayed_cost)
|
|
|
|
def revert(self, context, user_obj, user_bak, resource, result,
|
|
*args, **kwargs):
|
|
if isinstance(result, ft.Failure):
|
|
LOG.error(_LE("Error when updating user: %s"), user_obj.id)
|
|
return
|
|
|
|
# Restore user
|
|
user = user_mod.User.from_dict(user_bak)
|
|
user.store(context)
|
|
|
|
|
|
class UpdateUserJobsTask(task.Task):
|
|
"""Update user jobs."""
|
|
|
|
def execute(self, user_obj, **kwargs):
|
|
res = bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
|
|
user=user_obj.to_dict())
|
|
if not res:
|
|
LOG.error(_LE("Error when updating user jobs: %s"), user_obj.id)
|
|
raise
|
|
|
|
|
|
def get_settle_account_flow(context, user_id, task=None):
|
|
"""Constructs and returns settle account task flow."""
|
|
|
|
flow_name = user_id + '_settle_account'
|
|
flow = linear_flow.Flow(flow_name)
|
|
kwargs = {
|
|
'context': context,
|
|
'user_id': user_id,
|
|
'task': task,
|
|
}
|
|
flow.add(LoadUserTask(),
|
|
SettleAccountTask())
|
|
if task != 'freeze':
|
|
flow.add(UpdateUserJobsTask())
|
|
return taskflow.engines.load(flow, store=kwargs)
|
|
|
|
|
|
def get_create_resource_flow(context, user_id, resource):
|
|
"""Constructs and returns user task flow.
|
|
|
|
:param context: The request context.
|
|
:param user_id: The ID of user.
|
|
:param resource: Object resource to create.
|
|
"""
|
|
|
|
flow_name = user_id + '_create_resource'
|
|
flow = linear_flow.Flow(flow_name)
|
|
kwargs = {
|
|
'context': context,
|
|
'user_id': user_id,
|
|
'resource': resource,
|
|
}
|
|
flow.add(CreateResourceTask(),
|
|
LoadUserTask(),
|
|
UpdateUserRateTask(),
|
|
UpdateUserJobsTask())
|
|
return taskflow.engines.load(flow, store=kwargs)
|
|
|
|
|
|
def get_delete_resource_flow(context, user_id, resource):
|
|
"""Constructs and returns user task flow.
|
|
|
|
:param context: The request context.
|
|
:param user_id: The ID of user.
|
|
:param resource: Object resource to delete.
|
|
"""
|
|
|
|
flow_name = user_id + '_delete_resource'
|
|
flow = linear_flow.Flow(flow_name)
|
|
kwargs = {
|
|
'context': context,
|
|
'user_id': user_id,
|
|
'resource': resource,
|
|
}
|
|
flow.add(DeleteResourceTask(),
|
|
CreateConsumptionTask(),
|
|
LoadUserTask(),
|
|
UpdateUserRateTask(),
|
|
UpdateUserJobsTask())
|
|
return taskflow.engines.load(flow, store=kwargs)
|
|
|
|
|
|
def get_update_resource_flow(context, user_id, resource, values):
|
|
"""Constructs and returns user task flow.
|
|
|
|
:param context: The request context.
|
|
:param user_id: The ID of user.
|
|
:param resource: Object resource to update.
|
|
:param values: The values to update.
|
|
"""
|
|
|
|
flow_name = user_id + '_update_resource'
|
|
flow = linear_flow.Flow(flow_name)
|
|
kwargs = {
|
|
'context': context,
|
|
'user_id': user_id,
|
|
'resource': resource,
|
|
'resource_bak': resource.to_dict(),
|
|
'values': values,
|
|
}
|
|
flow.add(UpdateResourceTask(),
|
|
CreateConsumptionTask(),
|
|
LoadUserTask(),
|
|
UpdateUserRateTask(),
|
|
UpdateUserJobsTask())
|
|
return taskflow.engines.load(flow, store=kwargs)
|