Fix pep8 and revise db

Change-Id: I0fc0e8809f776d4c6afabb47e0587d624365b977
This commit is contained in:
lvdongbing 2015-12-29 04:31:47 -05:00
parent 379b0a26d9
commit 1ad368ac31
26 changed files with 470 additions and 783 deletions

View File

@ -11,9 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import six
from webob import exc
from bilean.api.openstack.v1 import util

View File

@ -44,7 +44,8 @@ service_opts = [
help=_('Number of heat-engine processes to fork and run.')),
cfg.StrOpt('environment_dir',
default='/etc/bilean/environments',
help=_('The directory to search for environment files.')),]
help=_('The directory to search for environment files.')),
]
rpc_opts = [
cfg.StrOpt('host',

View File

@ -200,6 +200,7 @@ class EventNotFound(BileanException):
class InvalidResource(BileanException):
msg_fmt = _("%(msg)")
class InternalError(BileanException):
'''A base class for internal exceptions in bilean.

View File

@ -83,7 +83,7 @@ def setup(url=None, optional=False):
def cleanup():
"""Cleanup the oslo_messaging layer."""
global TRANSPORTS, NOTIFIER
global TRANSPORT, TRANSPORTS, NOTIFIER
for url in TRANSPORTS:
TRANSPORTS[url].cleanup()
del TRANSPORTS[url]

View File

@ -11,11 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
# RESOURCE_TYPES = ["instance", "volume", "bandwidth", "ha", "router", "rdb",
# "load_balancer", "snapshot", "self_image"]
# RESOURCE_STATUS = ["active", "paused"]
MIN_VALUE = "1"
MAX_VALUE = "100000000"
@ -30,75 +25,37 @@ RPC_ATTRs = (
'billing_notifications',
'1.0',
)
USER_KEYS = (
USER_ID,
USER_BALANCE,
USER_RATE,
USER_CREDIT,
USER_LAST_BILL,
USER_STATUS,
USER_STATUS_REASION,
USER_CREATED_AT,
USER_UPDATED_AT
USER_ID, USER_POLICY_ID, USER_BALANCE, USER_RATE, USER_CREDIT,
USER_LAST_BILL, USER_STATUS, USER_STATUS_REASION, USER_CREATED_AT,
USER_UPDATED_AT, USER_DELETED_AT,
) = (
'id',
'balance',
'rate',
'credit',
'last_bill',
'status',
'status_reason',
'created_at',
'updated_at'
'id', 'policy_id', 'balance', 'rate', 'credit',
'last_bill', 'status', 'status_reason', 'created_at',
'updated_at', 'deleted_at',
)
RES_KEYS = (
RES_ID,
RES_RESOURCE_TYPE,
RES_SIZE,
RES_RATE,
RES_STATUS,
RES_STATUS_REASON,
RES_USER_ID,
RES_RULE_ID,
RES_RESOURCE_REF,
RES_CREATED_AT,
RES_UPDATED_AT,
RES_DELETED_AT,
RES_DELETED
RESOURCE_KEYS = (
RES_ID, RES_USER_ID, RES_RULE_ID, RES_RESOURCE_TYPE, RES_PROPERTIES,
RES_RATE, RES_CREATED_AT, RES_UPDATED_AT, RES_DELETED_AT,
) = (
'id',
'resource_type',
'size',
'rate',
'status',
'status_reason',
'user_id',
'rule_id',
'resource_ref',
'created_at',
'updated_at',
'deleted_at',
'deleted'
'id', 'user_id', 'rule_id', 'resource_type', 'properties',
'rate', 'created_at', 'updated_at', 'deleted_at',
)
RULE_KEYS = (
RULE_ID,
RULE_NAME,
RULE_TYPE,
RULE_SPEC,
RULE_METADATA,
RULE_UPDATED_AT,
RULE_CREATED_AT,
RULE_DELETED_AT,
RULE_ID, RULE_NAME, RULE_TYPE, RULE_SPEC, RULE_METADATA,
RULE_UPDATED_AT, RULE_CREATED_AT, RULE_DELETED_AT,
) = (
'id',
'name',
'type',
'spec',
'metadata',
'updated_at',
'created_at',
'deleted_at',
'id', 'name', 'type', 'spec', 'metadata',
'updated_at', 'created_at', 'deleted_at',
)
EVENT_KEYS = (
EVENT_ID, EVENT_USER_ID, EVENT_ACTION, EVENT_TIMESTAMP,
EVENT_RESOURCE_TYPE, EVENT_VALUE, EVENT_DELETED_AT,
) = (
'id', 'user_id', 'action', 'timestamp',
'resource_type', 'value', 'deleted_at',
)

View File

@ -41,8 +41,10 @@ def db_version(engine):
return IMPL.db_version(engine)
def user_get(context, user_id):
return IMPL.user_get(context, user_id)
def user_get(context, user_id, show_deleted=False, tenant_safe=True):
return IMPL.user_get(context, user_id,
show_deleted=show_deleted,
tenant_safe=tenant_safe)
def user_update(context, user_id, values):
@ -57,32 +59,26 @@ def user_delete(context, user_id):
return IMPL.user_delete(context, user_id)
def user_get_all(context):
return IMPL.user_get_all(context)
def user_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
return IMPL.user_get_all(context, show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters)
def user_get_by_keystone_user_id(context, user_id):
return IMPL.user_get_by_keystone_user_id(context, user_id)
def rule_get(context, rule_id, show_deleted=False):
return IMPL.rule_get(context, rule_id, show_deleted=False)
def user_delete_by_keystone_user_id(context, user_id):
return IMPL.user_delete_by_keystone_user_id(context, user_id)
def user_update_by_keystone_user_id(context, user_id, values):
return IMPL.user_update_by_keystone_user_id(context, user_id, values)
def rule_get(context, rule_id):
return IMPL.rule_get(context, rule_id)
def rule_get_all(context):
return IMPL.rule_get_all(context)
def get_rule_by_filters(context, **filters):
return IMPL.get_rule_by_filters(context, **filters)
def rule_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
return IMPL.rule_get_all(context, show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters)
def rule_create(context, values):
@ -97,19 +93,20 @@ def rule_delete(context, rule_id):
return IMPL.rule_delete(context, rule_id)
def resource_get(context, resource_id):
return IMPL.resource_get(context, resource_id)
def resource_get(context, resource_id, show_deleted=False, tenant_safe=True):
return IMPL.resource_get(context, resource_id,
show_deleted=show_deleted,
tenant_safe=tenant_safe)
def resource_get_all(context, **filters):
return IMPL.resource_get_all(context, **filters)
def resource_get_by_physical_resource_id(context,
physical_resource_id,
resource_type):
return IMPL.resource_get_by_physical_resource_id(
context, physical_resource_id, resource_type)
def resource_get_all(context, user_id=None, show_deleted=False,
limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, tenant_safe=True):
return IMPL.resource_get_all(context, user_id=user_id,
show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters, tenant_safe=tenant_safe)
def resource_create(context, values):
@ -120,45 +117,24 @@ def resource_update(context, resource_id, values):
return IMPL.resource_update(context, resource_id, values)
def resource_update_by_resource(context, resource):
return IMPL.resource_update_by_resource(context, resource)
def resource_delete(context, resource_id):
IMPL.resource_delete(context, resource_id)
def resource_delete_by_user_id(context, user_id):
IMPL.resource_delete(context, user_id)
def event_get(context, event_id, tenant_safe=True):
return IMPL.event_get(context, event_id, tenant_safe=tenant_safe)
def resource_delete_by_physical_resource_id(context,
physical_resource_id,
resource_type):
return IMPL.resource_delete_by_physical_resource_id(
context, physical_resource_id, resource_type)
def event_get(context, event_id):
return IMPL.event_get(context, event_id)
def event_get_by_user_id(context, user_id):
return IMPL.event_get_by_user_id(context, user_id)
def event_get_by_user_and_resource(context,
user_id,
resource_type,
action=None):
return IMPL.event_get_by_user_and_resource(context,
user_id,
resource_type,
action)
def events_get_all_by_filters(context, **filters):
return IMPL.events_get_all_by_filters(context, **filters)
def event_get_all(context, user_id=None, show_deleted=False,
filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, tenant_safe=True,
start_time=None, end_time=None):
return IMPL.event_get_all(context, user_id=user_id,
show_deleted=show_deleted,
filters=filters, limit=limit,
marker=marker, sort_keys=sort_keys,
sort_dir=sort_dir, tenant_safe=tenant_safe,
start_time=start_time, end_time=end_time)
def event_create(context, values):
@ -169,24 +145,12 @@ def event_delete(context, event_id):
return IMPL.event_delete(context, event_id)
def event_delete_by_user_id(context, user_id):
return IMPL.event_delete_by_user_id(context, user_id)
def job_create(context, values):
return IMPL.job_create(context, values)
def job_get(context, job_id):
return IMPL.job_get(context, job_id)
def job_get_by_engine_id(context, engine_id):
return IMPL.job_get_by_engine_id(context, engine_id)
def job_update(context, job_id, values):
return IMPL.job_update(context, job_id, values)
def job_get_all(context, engine_id=None):
return IMPL.job_get_all(context, engine_id=engine_id)
def job_delete(context, job_id):

View File

@ -12,17 +12,20 @@
# under the License.
'''Implementation of SQLAlchemy backend.'''
import six
import sys
from oslo_config import cfg
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils
from oslo_log import log as logging
from sqlalchemy.orm.session import Session
from sqlalchemy.sql import func
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common import params
from bilean.db.sqlalchemy import filters as db_filters
from bilean.db.sqlalchemy import migration
from bilean.db.sqlalchemy import models
@ -54,19 +57,49 @@ def model_query(context, *args):
return query
def _get_sort_keys(sort_keys, mapping):
'''Returns an array containing only whitelisted keys
:param sort_keys: an array of strings
:param mapping: a mapping from keys to DB column names
:returns: filtered list of sort keys
'''
if isinstance(sort_keys, six.string_types):
sort_keys = [sort_keys]
return [mapping[key] for key in sort_keys or [] if key in mapping]
def _paginate_query(context, query, model, limit=None, marker=None,
sort_keys=None, sort_dir=None, default_sort_keys=None):
if not sort_keys:
sort_keys = default_sort_keys or []
if not sort_dir:
sort_dir = 'asc'
model_marker = None
if marker:
model_marker = model_query(context, model).get(marker)
try:
query = utils.paginate_query(query, model, limit, sort_keys,
model_marker, sort_dir)
except utils.InvalidSortKey:
raise exception.InvalidParameter(name='sort_keys', value=sort_keys)
return query
def soft_delete_aware_query(context, *args, **kwargs):
"""Query helper that accounts for context's `show_deleted` field.
"""Query helper that accounts for context's `show_deleted` field.
:param show_deleted: if True, overrides context's show_deleted field.
"""
:param show_deleted: if True, overrides context's show_deleted field.
"""
query = model_query(context, *args)
show_deleted = kwargs.get('show_deleted') or context.show_deleted
query = model_query(context, *args)
show_deleted = kwargs.get('show_deleted') or context.show_deleted
if not show_deleted:
query = query.filter_by(deleted_at=None)
if not show_deleted:
query = query.filter_by(deleted_at=None)
return query
return query
def _session(context):
@ -83,13 +116,18 @@ def db_version(engine):
return migration.db_version(engine)
def user_get(context, user_id):
result = model_query(context, models.User).get(user_id)
def user_get(context, user_id, show_deleted=False, tenant_safe=True):
query = model_query(context, models.User)
user = query.get(user_id)
if not result:
raise exception.NotFound(_('User with id %s not found') % user_id)
deleted_ok = show_deleted or context.show_deleted
if user is None or user.deleted_at is not None and not deleted_ok:
return None
return result
if tenant_safe and context.tenant_id != user.user_id:
return None
return user
def user_update(context, user_id, values):
@ -103,7 +141,6 @@ def user_update(context, user_id, values):
user.update(values)
user.save(_session(context))
return user_get(context, user_id)
def user_create(context, values):
@ -114,42 +151,82 @@ def user_create(context, values):
def user_delete(context, user_id):
session = _session(context)
user = user_get(context, user_id)
session = Session.object_session(user)
session.delete(user)
if not user:
raise exception.NotFound(_('Attempt to delete a user with id: '
'%(id)s %(msg)s') % {
'id': user_id,
'msg': 'that does not exist'})
# Delete all related resource records
for resource in user.resources:
session.delete(resource)
# Delete all related event records
for event in user.events:
session.delete(event)
user.soft_delete(session=session)
session.flush()
def user_get_all(context):
results = model_query(context, models.User).all()
def user_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
query = soft_delete_aware_query(context, models.User,
show_deleted=show_deleted)
if not results:
if filters is None:
filters = {}
sort_key_map = {
params.USER_CREATED_AT: models.User.created_at.key,
params.USER_UPDATED_AT: models.User.updated_at.key,
params.USER_BALANCE: models.User.balance.key,
params.USER_STATUS: models.User.status.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.User, filters)
return _paginate_query(context, query, models.User,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['created_at']).all()
def rule_get(context, rule_id, show_deleted=False):
query = model_query(context, models.Rule)
rule = query.get(rule_id)
deleted_ok = show_deleted or context.show_deleted
if rule is None or rule.deleted_at is not None and not deleted_ok:
return None
return results
return rule
def rule_get(context, rule_id):
result = model_query(context, models.Rule).get(rule_id)
def rule_get_all(context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
query = soft_delete_aware_query(context, models.Rule,
show_deleted=show_deleted)
if not result:
raise exception.NotFound(_('Rule with id %s not found') % rule_id)
if filters is None:
filters = {}
return result
sort_key_map = {
params.RULE_NAME: models.Rule.name.key,
params.RULE_TYPE: models.Rule.type.key,
params.RULE_CREATED_AT: models.Rule.created_at.key,
params.RULE_UPDATED_AT: models.Rule.updated_at.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
def rule_get_all(context):
return model_query(context, models.Rule).all()
def get_rule_by_filters(context, **filters):
filter_keys = filters.keys()
query = model_query(context, models.Rule)
if "resource_type" in filter_keys:
query = query.filter_by(resource_type=filters["resource_type"])
if "size" in filter_keys:
query = query.filter_by(size=filters["size"])
return query.all()
query = db_filters.exact_filter(query, models.Rule, filters)
return _paginate_query(context, query, models.Rule,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['created_at']).all()
def rule_create(context, values):
@ -174,55 +251,58 @@ def rule_update(context, rule_id, values):
def rule_delete(context, rule_id):
rule = rule_get(context, rule_id)
if not rule:
raise exception.NotFound(_('Attempt to delete a rule with id: '
'%(id)s %(msg)s') % {
'id': rule_id,
'msg': 'that does not exist'})
session = Session.object_session(rule)
session.delete(rule)
rule.soft_delete(session=session)
session.flush()
def resource_get(context, resource_id):
result = model_query(context, models.Resource).get(resource_id)
def resource_get(context, resource_id, show_deleted=False, tenant_safe=True):
query = model_query(context, models.Resource)
resource = query.get(resource_id)
if not result:
raise exception.NotFound(_('Resource with id %s not found') %
resource_id)
deleted_ok = show_deleted or context.show_deleted
if resource is None or resource.deleted_at is not None and not deleted_ok:
return None
return result
if tenant_safe and context.tenant_id != resource.user_id:
return None
return resource
def resource_get_by_physical_resource_id(context,
physical_resource_id,
resource_type):
result = (model_query(context, models.Resource)
.filter_by(resource_ref=physical_resource_id)
.filter_by(resource_type=resource_type)
.first())
def resource_get_all(context, user_id=None, show_deleted=False,
limit=None, marker=None, sort_keys=None, sort_dir=None,
filters=None, tenant_safe=True):
query = soft_delete_aware_query(context, models.Resource,
show_deleted=show_deleted)
if not result:
raise exception.NotFound(_('Resource with physical_resource_id: '
'%(resource_id)s, resource_type: '
'%(resource_type)s not found.') % {
'resource_id': physical_resource_id,
'resource_type': resource_type})
if tenant_safe:
query = query.filter_by(user_id=context.tenant_id)
return result
elif user_id:
query = query.filter_by(user_id=user_id)
if filters is None:
filters = {}
def resource_get_all(context, **filters):
if filters.get('show_deleted') is None:
filters['show_deleted'] = False
query = soft_delete_aware_query(context, models.Resource, **filters)
if "resource_type" in filters:
query = query.filter_by(resource_type=filters["resource_type"])
if "user_id" in filters:
query = query.filter_by(user_id=filters["user_id"])
return query.all()
def resource_get_by_user_id(context, user_id, show_deleted=False):
query = soft_delete_aware_query(
context, models.Resource, show_deleted=show_deleted
).filter_by(user_id=user_id).all()
return query
sort_key_map = {
params.RES_CREATED_AT: models.Resource.created_at.key,
params.RES_UPDATED_AT: models.Resource.updated_at.key,
params.RES_RESOURCE_TYPE: models.Resource.resource_type.key,
params.RES_USER_ID: models.Resource.user_id.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Resource, filters)
return _paginate_query(context, query, models.Node,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['created_at']).all()
def resource_create(context, values):
@ -246,23 +326,14 @@ def resource_update(context, resource_id, values):
return resource
def resource_update_by_resource(context, res):
resource = resource_get_by_physical_resource_id(
context, res['resource_ref'], res['resource_type'])
if not resource:
raise exception.NotFound(_('Attempt to update a resource: '
'%(res)s %(msg)s') % {
'res': res,
'msg': 'that does not exist'})
resource.update(res)
resource.save(_session(context))
return resource
def resource_delete(context, resource_id, soft_delete=True):
resource = resource_get(context, resource_id)
if not resource:
raise exception.NotFound(_('Attempt to delete a resource with id: '
'%(id)s %(msg)s') % {
'id': resource_id,
'msg': 'that does not exist'})
session = Session.object_session(resource)
if soft_delete:
resource.soft_delete(session=session)
@ -271,84 +342,52 @@ def resource_delete(context, resource_id, soft_delete=True):
session.flush()
def resource_delete_by_physical_resource_id(context,
physical_resource_id,
resource_type,
soft_delete=True):
resource = resource_get_by_physical_resource_id(
context, physical_resource_id, resource_type)
session = Session.object_session(resource)
if soft_delete:
resource.soft_delete(session=session)
else:
session.delete(resource)
session.flush()
def event_get(context, event_id, tenant_safe=True):
query = model_query(context, models.Event)
event = query.get(event_id)
if event is None:
return None
if tenant_safe and context.tenant_id != event.user_id:
return None
return event
def resource_delete_by_user_id(context, user_id):
resource = resource_get_by_user_id(context, user_id)
session = Session.object_session(resource)
session.delete(resource)
session.flush()
def event_get_all(context, user_id=None, show_deleted=False,
filters=None, limit=None, marker=None, sort_keys=None,
sort_dir=None, tenant_safe=True, start_time=None,
end_time=None):
query = soft_delete_aware_query(context, models.Event,
show_deleted=show_deleted)
if tenant_safe:
query = query.filter_by(user_id=context.tenant_id)
def event_get(context, event_id):
result = model_query(context, models.Event).get(event_id)
if not result:
raise exception.NotFound(_('Event with id %s not found') % event_id)
return result
def event_get_by_user_id(context, user_id):
query = model_query(context, models.Event).filter_by(user_id=user_id)
return query
def event_get_by_user_and_resource(context,
user_id,
resource_type,
action=None):
query = (model_query(context, models.Event)
.filter_by(user_id=user_id)
.filter_by(resource_type=resource_type)
.filter_by(action=action).all())
return query
def events_get_all_by_filters(context,
user_id=None,
resource_type=None,
start=None,
end=None,
action=None,
aggregate=None):
if aggregate == 'sum':
query_prefix = model_query(
context, models.Event.resource_type, func.sum(models.Event.value)
).group_by(models.Event.resource_type)
elif aggregate == 'avg':
query_prefix = model_query(
context, models.Event.resource_type, func.avg(models.Event.value)
).group_by(models.Event.resource_type)
else:
query_prefix = model_query(context, models.Event)
if not context.is_admin:
if context.tenant_id:
query_prefix = query_prefix.filter_by(user_id=context.tenant_id)
elif user_id:
query_prefix = query_prefix.filter_by(user_id=user_id)
if resource_type:
query_prefix = query_prefix.filter_by(resource_type=resource_type)
if action:
query_prefix = query_prefix.filter_by(action=action)
if start:
query_prefix = query_prefix.filter(models.Event.created_at >= start)
if end:
query_prefix = query_prefix.filter(models.Event.created_at <= end)
query = query.filter_by(user_id=user_id)
return query_prefix.all()
if start_time:
query = query.filter_by(models.Event.timestamp >= start_time)
if end_time:
query = query.filter_by(models.Event.timestamp <= end_time)
if filters is None:
filters = {}
sort_key_map = {
params.EVENT_ACTION: models.Event.action.key,
params.EVENT_RESOURCE_TYPE: models.Event.resource_type.key,
params.EVENT_TIMESTAMP: models.Event.timestamp.key,
params.EVENT_USER_ID: models.Event.user_id.key,
}
keys = _get_sort_keys(sort_keys, sort_key_map)
query = db_filters.exact_filter(query, models.Resource, filters)
return _paginate_query(context, query, models.Node,
limit=limit, marker=marker,
sort_keys=keys, sort_dir=sort_dir,
default_sort_keys=['timestamp']).all()
def event_create(context, values):
@ -358,20 +397,6 @@ def event_create(context, values):
return event_ref
def event_delete(context, event_id):
event = event_get(context, event_id)
session = Session.object_session(event)
session.delete(event)
session.flush()
def event_delete_by_user_id(context, user_id):
event = event_get(context, user_id)
session = Session.object_session(event)
session.delete(event)
session.flush()
def job_create(context, values):
job_ref = models.Job()
job_ref.update(values)
@ -379,37 +404,22 @@ def job_create(context, values):
return job_ref
def job_get(context, job_id):
result = model_query(context, models.Job).get(job_id)
def job_get_all(context, engine_id=None):
query = model_query(context, models.Job)
if engine_id:
query = query.filter_by(engine_id=engine_id)
if not result:
raise exception.NotFound(_('Job with id %s not found') % job_id)
return result
def job_get_by_engine_id(context, engine_id):
query = (model_query(context, models.Job)
.filter_by(engine_id=engine_id).all())
return query
def job_update(context, job_id, values):
job = job_get(context, job_id)
if not job:
raise exception.NotFound(_('Attempt to update a job with id: '
'%(id)s %(msg)s') % {
'id': job_id,
'msg': 'that does not exist'})
job.update(values)
job.save(_session(context))
return job
return query.all()
def job_delete(context, job_id):
job = job_get(context, job_id)
job = model_query(context, models.Job).get(job_id)
if job is None:
raise exception.NotFound(_('Attempt to delete a job with id: '
'%(id)s %(msg)s') % {
'id': job_id,
'msg': 'that does not exist'})
session = Session.object_session(job)
session.delete(job)
session.flush()

View File

@ -1,15 +1,16 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
def exact_filter(query, model, filters):
@ -31,7 +32,7 @@ def exact_filter(query, model, filters):
if filters is None:
filters = {}
for key, value in filters.iteritems():
for key, value in six.iteritems(filters):
if isinstance(value, (list, tuple, set, frozenset)):
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))

View File

@ -24,14 +24,19 @@ def upgrade(migrate_engine):
'user', meta,
sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('policy_id',
sqlalchemy.String(36),
sqlalchemy.ForeignKey('policy.id'),
nullable=True),
sqlalchemy.Column('balance', sqlalchemy.Float),
sqlalchemy.Column('rate', sqlalchemy.Float),
sqlalchemy.Column('credit', sqlalchemy.Integer),
sqlalchemy.Column('last_bill', sqlalchemy.DateTime),
sqlalchemy.Column('status', sqlalchemy.String(10)),
sqlalchemy.Column('status_reason', sqlalchemy.String(255)),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
@ -55,12 +60,6 @@ def upgrade(migrate_engine):
'resource', meta,
sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
nullable=False),
sqlalchemy.Column('resource_ref', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Boolean, default=False),
sqlalchemy.Column('user_id',
sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
@ -71,10 +70,11 @@ def upgrade(migrate_engine):
nullable=False),
sqlalchemy.Column('resource_type', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('size', sqlalchemy.String(36), nullable=False),
sqlalchemy.Column('properties', types.Dict),
sqlalchemy.Column('rate', sqlalchemy.Float, nullable=False),
sqlalchemy.Column('status', sqlalchemy.String(10)),
sqlalchemy.Column('status_reason', sqlalchemy.String(255)),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
@ -85,9 +85,7 @@ def upgrade(migrate_engine):
primary_key=True, nullable=False),
sqlalchemy.Column('user_id', sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'), nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('resource_id', sqlalchemy.String(36)),
sqlalchemy.Column('timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('resource_type', sqlalchemy.String(36)),
sqlalchemy.Column('action', sqlalchemy.String(36)),
sqlalchemy.Column('value', sqlalchemy.Float),

View File

@ -1,57 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
services = sqlalchemy.Table(
'services', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Boolean),
sqlalchemy.Column('host', sqlalchemy.String(length=255)),
sqlalchemy.Column('binary', sqlalchemy.String(length=255)),
sqlalchemy.Column('topic', sqlalchemy.String(length=255)),
sqlalchemy.Column('report_count', sqlalchemy.Integer, nullable=False),
sqlalchemy.Column('disabled', sqlalchemy.Boolean),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
try:
services.create()
except Exception:
LOG.error("Table |%s| not created!", repr(services))
raise
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
services = sqlalchemy.Table('services', meta, autoload=True)
try:
services.drop()
except Exception:
LOG.error("services table not dropped")
raise

View File

@ -11,7 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for heat data.
SQLAlchemy models for Bilean data.
"""
import uuid
@ -19,7 +19,6 @@ import uuid
from bilean.db.sqlalchemy import types
from oslo_db.sqlalchemy import models
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import sqlalchemy
@ -36,7 +35,7 @@ def get_session():
return db_api.get_session()
class BileanBase(models.ModelBase, models.TimestampMixin):
class BileanBase(models.ModelBase):
"""Base class for Heat Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
@ -78,12 +77,11 @@ class BileanBase(models.ModelBase, models.TimestampMixin):
class SoftDelete(object):
deleted_at = sqlalchemy.Column(sqlalchemy.DateTime)
deleted = sqlalchemy.Column(sqlalchemy.Boolean, default=False)
def soft_delete(self, session=None):
"""Mark this object as deleted."""
self.update_and_save({'deleted_at': timeutils.utcnow(),
'deleted': True}, session=session)
self.update_and_save({'deleted_at': timeutils.utcnow()},
session=session)
class StateAware(object):
@ -100,20 +98,36 @@ class StateAware(object):
self._status_reason = reason and reason[:255] or ''
class User(BASE, BileanBase, StateAware):
class User(BASE, BileanBase, SoftDelete, StateAware, models.TimestampMixin):
"""Represents a user to record account"""
__tablename__ = 'user'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True)
policy_id = sqlalchemy.Column(
sqlalchemy.String(36),
sqlalchemy.ForeignKey('policy.id'),
nullable=True)
balance = sqlalchemy.Column(sqlalchemy.Float, default=0.0)
rate = sqlalchemy.Column(sqlalchemy.Float, default=0.0)
credit = sqlalchemy.Column(sqlalchemy.Integer, default=0)
last_bill = sqlalchemy.Column(
sqlalchemy.DateTime, default=timeutils.utcnow())
updated_at = sqlalchemy.Column(sqlalchemy.DateTime)
class Rule(BASE, BileanBase):
class Policy(BASE, BileanBase, SoftDelete, models.TimestampMixin):
"""Represents a policy to collect rules."""
__tablename__ = 'policy'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: str(uuid.uuid4()))
rules = sqlalchemy.Column(types.List)
is_default = sqlalchemy.Column(sqlalchemy.Boolean, default=False)
meta_data = sqlalchemy.Column(types.dict)
class Rule(BASE, BileanBase, SoftDelete, models.TimestampMixin):
"""Represents a rule created to bill someone resource"""
__tablename__ = 'rule'
@ -123,16 +137,14 @@ class Rule(BASE, BileanBase):
type = sqlalchemy.Column(sqlalchemy.String(255))
spec = sqlalchemy.Column(types.Dict)
meta_data = sqlalchemy.Column(types.Dict)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime)
class Resource(BASE, BileanBase, StateAware, SoftDelete):
class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin):
"""Represents a meta resource with rate"""
__tablename__ = 'resource'
id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
default=lambda: str(uuid.uuid4()))
resource_ref = sqlalchemy.Column(sqlalchemy.String(36), nullable=False)
user_id = sqlalchemy.Column(
sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
@ -140,16 +152,14 @@ class Resource(BASE, BileanBase, StateAware, SoftDelete):
rule_id = sqlalchemy.Column(
sqlalchemy.String(36),
sqlalchemy.ForeignKey('rule.id'),
nullable=False)
user = relationship(User, backref=backref('resource'))
rule = relationship(Rule, backref=backref('resource'))
nullable=True)
user = relationship(User, backref=backref('resources'))
resource_type = sqlalchemy.Column(sqlalchemy.String(36), nullable=False)
size = sqlalchemy.Column(sqlalchemy.String(36), nullable=False)
properties = sqlalchemy.Column(types.Dict)
rate = sqlalchemy.Column(sqlalchemy.Float, nullable=False)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime)
class Event(BASE, BileanBase):
class Event(BASE, BileanBase, SoftDelete):
"""Represents an event generated by the bilean engine."""
__tablename__ = 'event'
@ -160,9 +170,9 @@ class Event(BASE, BileanBase):
user_id = sqlalchemy.Column(sqlalchemy.String(36),
sqlalchemy.ForeignKey('user.id'),
nullable=False)
user = relationship(User, backref=backref('event'))
resource_id = sqlalchemy.Column(sqlalchemy.String(36))
user = relationship(User, backref=backref('events'))
action = sqlalchemy.Column(sqlalchemy.String(36))
timestamp = sqlalchemy.Column(sqlalchemy.DateTime)
resource_type = sqlalchemy.Column(sqlalchemy.String(36))
value = sqlalchemy.Column(sqlalchemy.Float)

View File

@ -1,86 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils import timeutils
from bilean.common import params
LOG = logging.getLogger(__name__)
def format_user(user, detail=False):
'''Format user object to dict
Return a representation of the given user that matches the API output
expectations.
'''
updated_at = user.updated_at and timeutils.isotime(user.updated_at)
info = {
params.USER_ID: user.id,
params.USER_BALANCE: user.balance,
params.USER_RATE: user.rate,
params.USER_CREDIT: user.credit,
params.USER_STATUS: user.status,
params.USER_UPDATED_AT: updated_at,
params.USER_LAST_BILL: user.last_bill
}
if detail:
info[params.USER_CREATED_AT] = user.created_at
info[params.USER_STATUS_REASION] = user.status_reason
return info
def format_bilean_resource(resource, detail=False):
'''Format resource object to dict
Return a representation of the given resource that matches the API output
expectations.
'''
updated_at = resource.updated_at and timeutils.isotime(resource.updated_at)
info = {
params.RES_ID: resource.id,
params.RES_RESOURCE_TYPE: resource.resource_type,
params.RES_SIZE: resource.size,
params.RES_RATE: resource.rate,
params.RES_STATUS: resource.status,
params.RES_USER_ID: resource.user_id,
params.RES_RESOURCE_REF: resource.resource_ref,
params.RES_UPDATED_AT: updated_at,
}
if detail:
info[params.RES_CREATED_AT] = resource.created_at
info[params.RES_RULE_ID] = resource.rule_id
info[params.RES_STATUS_REASION] = resource.status_reason
return info
def format_rule(rule):
'''Format rule object to dict
Return a representation of the given rule that matches the API output
expectations.
'''
updated_at = rule.updated_at and timeutils.isotime(rule.updated_at)
info = {
params.RULE_ID: rule.id,
params.RULE_RESOURCE_TYPE: rule.resource_type,
params.RULE_SIZE: rule.size,
params.RULE_PARAMS: rule.params,
params.RULE_UPDATED_AT: updated_at,
params.RULE_CREATED_AT: rule.created_at,
}
return info

View File

@ -1,112 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as oslo_context
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from bilean.common import params
from bilean.common.i18n import _LI
from bilean.common import messaging as bilean_messaging
LOG = logging.getLogger(__name__)
OPERATIONS = (
START_ACTION, CANCEL_ACTION, STOP
) = (
'start_action', 'cancel_action', 'stop'
)
class Dispatcher(service.Service):
'''Listen on an AMQP queue named for the engine.
Receive notification from engine services and schedule actions.
'''
def __init__(self, engine_service, topic, version, thread_group_mgr):
super(Dispatcher, self).__init__()
self.TG = thread_group_mgr
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
def start(self):
super(Dispatcher, self).start()
self.target = oslo_messaging.Target(server=self.engine_id,
topic=self.topic,
version=self.version)
server = bilean_messaging.get_rpc_server(self.target, self)
server.start()
def listening(self, ctxt):
'''Respond affirmatively to confirm that engine is still alive.'''
return True
def start_action(self, ctxt, action_id=None):
self.TG.start_action(self.engine_id, action_id)
def cancel_action(self, ctxt, action_id):
'''Cancel an action.'''
self.TG.cancel_action(action_id)
def suspend_action(self, ctxt, action_id):
'''Suspend an action.'''
self.TG.suspend_action(action_id)
def resume_action(self, ctxt, action_id):
'''Resume an action.'''
self.TG.resume_action(action_id)
def stop(self):
super(Dispatcher, self).stop()
# Wait for all action threads to be finished
LOG.info(_LI("Stopping all action threads of engine %s"),
self.engine_id)
# Stop ThreadGroup gracefully
self.TG.stop(True)
LOG.info(_LI("All action threads have been finished"))
def notify(method, engine_id=None, **kwargs):
'''Send notification to dispatcher
:param method: remote method to call
:param engine_id: dispatcher to notify; None implies broadcast
'''
client = bilean_messaging.get_rpc_client(version=params.RPC_API_VERSION)
if engine_id:
# Notify specific dispatcher identified by engine_id
call_context = client.prepare(
version=params.RPC_API_VERSION,
topic=params.ENGINE_DISPATCHER_TOPIC,
server=engine_id)
else:
# Broadcast to all disptachers
call_context = client.prepare(
version=params.RPC_API_VERSION,
topic=params.ENGINE_DISPATCHER_TOPIC)
try:
# We don't use ctext parameter in action progress
# actually. But since RPCClient.call needs this param,
# we use oslo current context here.
call_context.call(oslo_context.get_current(), method, **kwargs)
return True
except oslo_messaging.MessagingTimeout:
return False
def start_action(engine_id=None, **kwargs):
return notify(START_ACTION, engine_id, **kwargs)

View File

@ -13,11 +13,14 @@
import six
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import resource as bilean_resources
from oslo_log import log as logging
from oslo_utils import timeutils
LOG = logging.getLogger(__name__)
@ -30,7 +33,6 @@ class Event(object):
self.user_id = kwargs.get('user_id', None)
self.action = kwargs.get('action', None)
self.resource_type = kwargs.get('resource_type', None)
self.action = kwargs.get('action', None)
self.value = kwargs.get('value', 0)
@classmethod
@ -48,26 +50,31 @@ class Event(object):
return cls(record.timestamp, **kwargs)
@classmethod
def load(cls, context, db_event=None, event_id=None, project_safe=True):
def load(cls, context, db_event=None, event_id=None, tenant_safe=True):
'''Retrieve an event record from database.'''
if db_event is not None:
return cls.from_db_record(db_event)
record = db_api.event_get(context, event_id, project_safe=project_safe)
record = db_api.event_get(context, event_id, tenant_safe=tenant_safe)
if record is None:
raise exception.EventNotFound(event=event_id)
return cls.from_db_record(record)
@classmethod
def load_all(cls, context, filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, project_safe=True):
def load_all(cls, context, user_id=None, show_deleted=False,
filters=None, limit=None, marker=None, sort_keys=None,
sort_dir=None, tenant_safe=True, start_time=None,
end_time=None):
'''Retrieve all events from database.'''
records = db_api.event_get_all(context, limit=limit, marker=marker,
records = db_api.event_get_all(context, user_id=user_id, limit=limit,
show_deleted=show_deleted,
marker=marker, filters=filters,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
tenant_safe=tenant_safe,
start_time=start_time,
end_time=end_time)
for record in records:
yield cls.from_db_record(record)
@ -120,17 +127,18 @@ def record(context, user_id, action=None, seconds=0, value=0):
resources = bilean_resources.resource_get_all(
context, user_id=user_id)
for resource in resources:
usage = resource['rate'] / 3600.0 * time_length
event_create(context,
user_id=user_id,
resource_id=resource['id'],
resource_type=resource['resource_type'],
action=action,
value=usage)
usage = resource['rate'] * seconds
event = Event(timeutils.utcnow(),
user_id=user_id,
action=action,
resource_type=resource['resource_type'],
value=usage)
event.store(context)
else:
event_create(context,
user_id=user_id,
action=action,
value=recharge_value)
event = Event(timeutils.utcnow(),
user_id=user_id,
action=action,
value=value)
event.store(context)
except Exception as exc:
LOG.error(_("Error generate events: %s") % six.text_type(exc))

View File

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from bilean.common import exception
from bilean.common import utils
from bilean.db import api as db_api
from bilean.rules import base as rule_base
@ -30,9 +31,8 @@ class Policy(object):
self.updated_at = kwargs.get('updated_at', None)
self.deleted_at = kwargs.get('deleted_at', None)
def store(context, values):
"""Store the policy record into database table.
"""
def store(self, context, values):
"""Store the policy record into database table."""
values = {
'rules': self.rules,
@ -71,12 +71,12 @@ class Policy(object):
@classmethod
def load(cls, context, policy_id=None, policy=None, show_deleted=False,
project_safe=True):
tenant_safe=True):
'''Retrieve a policy from database.'''
if policy is None:
policy = db_api.policy_get(context, policy_id,
show_deleted=show_deleted,
project_safe=project_safe)
tenant_safe=tenant_safe)
if policy is None:
raise exception.PolicyNotFound(policy=policy_id)
@ -85,7 +85,7 @@ class Policy(object):
@classmethod
def load_all(cls, context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None, project_safe=True):
filters=None, tenant_safe=True):
'''Retrieve all policies of from database.'''
records = db_api.policy_get_all(context, show_deleted=show_deleted,
@ -93,7 +93,7 @@ class Policy(object):
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
tenant_safe=tenant_safe)
return [cls._from_db_record(record) for record in records]

View File

@ -11,9 +11,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from bilean.common import exception
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import policy as policy_mod
from bilean.engine import user as user_mod
class Resource(object):
@ -41,8 +41,7 @@ class Resource(object):
self.get_resource_price()
def store(self, context):
"""Store the resource record into database table.
"""
"""Store the resource record into database table."""
values = {
'user_id': self.user_id,
@ -83,29 +82,30 @@ class Resource(object):
@classmethod
def load(cls, context, resource_id=None, resource=None,
show_deleted=False, project_safe=True):
show_deleted=False, tenant_safe=True):
'''Retrieve a resource from database.'''
if resource is None:
resource = db_api.resource_get(context, resource_id,
show_deleted=show_deleted,
project_safe=project_safe)
tenant_safe=tenant_safe)
if resource is None:
raise exception.ResourceNotFound(resource=resource_id)
return cls._from_db_record(resource)
@classmethod
def load_all(cls, context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None, project_safe=True):
def load_all(cls, context, user_id=None, show_deleted=False,
limit=None, marker=None, sort_keys=None, sort_dir=None,
filters=None, tenant_safe=True):
'''Retrieve all users of from database.'''
records = db_api.resource_get_all(context, show_deleted=show_deleted,
records = db_api.resource_get_all(context, user_id=user_id,
show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
tenant_safe=tenant_safe)
return [cls._from_db_record(record) for record in records]
@ -121,7 +121,7 @@ class Resource(object):
'updated_at': utils.format_time(self.updated_at),
'deleted_at': utils.format_time(self.deleted_at),
}
return user_dict
return resource_dict
def do_delete(self, context, resource_id):
db_api.resource_delete(context, resource_id)

View File

@ -16,13 +16,17 @@ from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LI
from bilean.db import api as db_api
from bilean.engine import user as user_mod
from bilean import notifier
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import timedelta
import random
import six
bilean_task_opts = [
cfg.StrOpt('time_zone',
@ -65,7 +69,7 @@ class BileanScheduler(object):
trigger_types = (DATE, CRON) = ('date', 'cron')
def __init__(self, **kwargs):
super(BileanTask, self).__init__()
super(BileanScheduler, self).__init__()
self._scheduler = BackgroundScheduler()
self.notifier = notifier.Notifier()
self.engine_id = kwargs.get('engine_id', None)
@ -76,22 +80,22 @@ class BileanScheduler(object):
self._scheduler.add_jobstore(cfg.CONF.bilean_task.backend,
url=cfg.CONF.bilean_task.connection)
def init_scheduler(self)
def init_scheduler(self):
"""Init all jobs related to the engine from db."""
jobs = db_api.job_get_by_engine_id(self.context, self.engine_id)
jobs = db_api.job_get_all(self.context, engine_id=self.engine_id)
if not jobs:
LOG.info(_LI("No job found from db"))
return True
for job in jobs:
if self.bilean_scheduler.is_exist(job.id):
continue
task_name = "_%s_task" % (job.job_type)
task_name = "_%s_task" % (job.job_type)
task = getattr(self, task_name)
self.bilean_task.add_job(task, job.id,
job_type=job.job_type,
params=job.parameters)
def add_job(self, task, job_id, trigger_type='date', **kwargs):
def add_job(self, task, job_id, trigger_type='date', **kwargs):
"""Add a job to scheduler by given data.
:param str|unicode user_id: used as job_id
@ -112,7 +116,7 @@ class BileanScheduler(object):
args=[user_id],
id=job_id,
misfire_grace_time=mg_time)
return True
return True
# Add a cron type job
hour = kwargs.get('hour', None)

View File

@ -21,16 +21,16 @@ from oslo_service import service
from bilean.common import context as bilean_context
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LI
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.common import messaging as rpc_messaging
from bilean.common import schema
from bilean.engine import scheduler
from bilean.engine import clients as bilean_clients
from bilean.engine import environment
from bilean.engine import event as event_mod
from bilean.engine import policy as policy_mod
from bilean.engine import resource as resource_mod
from bilean.engine import scheduler
from bilean.engine import user as user_mod
from bilean.rules import base as rule_base
@ -55,7 +55,7 @@ class EngineService(service.Service):
self.host = host
self.topic = topic
self.bilean_scheduler = None
self.scheduler = None
self.engine_id = None
self.target = None
self._rpc_server = None
@ -68,15 +68,15 @@ class EngineService(service.Service):
def start(self):
self.engine_id = str(uuid.uuid4())
LOG.info(_LI("initialise bilean users from keystone.")
user_mod.User.init_users(self.context)
LOG.info(_LI("initialise bilean users from keystone."))
user_mod.User.init_users(self.context)
self.bilean_scheduler = scheduler.BileanScheduler(
engine_id=self.engine_id, context=self.context)
self.scheduler = scheduler.BileanScheduler(engine_id=self.engine_id,
context=self.context)
LOG.info(_LI("Starting billing scheduler for engine: %s"),
self.engine_id)
self.bilean_scheduler.init_scheduler()
self.bilean_scheduler.start()
self.scheduler.init_scheduler()
self.scheduler.start()
LOG.info(_LI("Starting rpc server for engine: %s"), self.engine_id)
target = oslo_messaging.Target(version=self.RPC_API_VERSION,
@ -104,7 +104,7 @@ class EngineService(service.Service):
LOG.info(_LI("Stopping billing scheduler for engine: %s"),
self.engine_id)
self.bilean_scheduler.stop()
self.scheduler.stop()
super(EngineService, self).stop()
@ -128,7 +128,7 @@ class EngineService(service.Service):
user.do_recharge(cnxt, value)
# As user has been updated, the billing job for the user
# should to be updated too.
self.bilean_scheduler.update_user_job(user)
self.scheduler.update_user_job(user)
return user.to_dict()
def user_delete(self, user_id):
@ -143,14 +143,14 @@ class EngineService(service.Service):
msg = _("The specified rule type (%(type)s) is not supported."
) % {"type": type_name}
raise exception.BileanBadRequest(msg=msg)
LOG.info(_LI("Creating rule type: %(type)s, name: %(name)s."),
{'type': type_name, 'name': name})
rule = plugin(name, spec, metadata=metadata)
try:
rule.validate()
except exception.InvalidSpec as ex:
msg = six.text_type()
msg = six.text_type(ex)
LOG.error(_LE("Failed in creating rule: %s"), msg)
raise exception.BileanBadRequest(msg=msg)
@ -189,7 +189,7 @@ class EngineService(service.Service):
count = resources.get('count', 1)
total_rate = 0
for resource in resources['resources']:
rule = policy.find_rule(cnxt, resource['resource_type'])
rule = policy.find_rule(cnxt, resource['resource_type'])
res = resource_mod.Resource('FAKE_ID', user.id,
resource['resource_type'],
resource['properties'])
@ -197,7 +197,7 @@ class EngineService(service.Service):
if count > 1:
total_rate = total_rate * count
# Pre 1 hour bill for resources
pre_bill = ratecount * 3600
pre_bill = total_rate * 3600
if pre_bill > user.balance:
return dict(validation=False)
return dict(validation=True)
@ -225,22 +225,22 @@ class EngineService(service.Service):
# Update user with resource
user.update_with_resource(self.context, resource)
# As the rate of user has changed, the billing job for the user
# should change too. self.bilean_scheduler.update_user_job(user)
# should change too. self.scheduler.update_user_job(user)
return resource.to_dict()
@bilean_context.request_context
def resource_list(self, cnxt, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None, project_safe=True):
marker=None, sort_keys=None, sort_dir=None,
filters=None, tenant_safe=True):
resources = resource_mod.Resource.load_all(cnxt, filters=filters,
show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
project_safe=project_safe))
tenant_safe=tenant_safe)
return {'resources': [r.to_dict() for r in resources]}
@bilean_context.request_context
@ -254,7 +254,7 @@ class EngineService(service.Service):
self.context, resource_id=resource['id'])
old_rate = res.rate
res.properties = resource['properties']
rule = rule_mod.Rule.load(self.context, rule_id=res.rule_id)
rule = rule_base.Rule.load(self.context, rule_id=res.rule_id)
res.rate = rule.get_price(res)
res.store(self.context)
res.d_rate = res.rate - old_rate
@ -262,15 +262,15 @@ class EngineService(service.Service):
user = user_mod.User.load(self.context, res.user_id)
user.update_with_resource(self.context, res, action='update')
self.bilean_scheduler.update_user_job(user)
self.scheduler.update_user_job(user)
def resource_delete(self, resource):
"""Do resource delete"""
res = resource_mod.Resource.load(
self.context, resource_id=resource['id'])
user = user_mod.User.load(self.context, user_id=resource['user_id']
user = user_mod.User.load(self.context, user_id=resource['user_id'])
user.update_with_resource(self.context, res, action='delete')
self.bilean_scheduler.update_user_job(user)
self.scheduler.update_user_job(user)
try:
res.do_delete(self.context)
except Exception as ex:
@ -279,11 +279,11 @@ class EngineService(service.Service):
@bilean_context.request_context
def event_list(self, cnxt, filters=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, project_safe=True):
events = event_mod.Event.load_all(context, limit=limit,
sort_keys=None, sort_dir=None, tenant_safe=True):
events = event_mod.Event.load_all(cnxt, limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
tenant_safe=tenant_safe)
return {'events': [e.to_dict() for e in events]}

View File

@ -11,13 +11,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import api
from bilean.engine import event as event_mod
from bilean.engine import resource as resource_mod
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
@ -34,7 +35,7 @@ class User(object):
)
def __init__(self, user_id, **kwargs):
self.id = user_id
self.id = user_id
self.policy_id = kwargs.get('policy_id', None)
self.balance = kwargs.get('balance', 0)
self.rate = kwargs.get('rate', 0.0)
@ -71,7 +72,7 @@ class User(object):
user = db_api.user_create(context, values)
self.created_at = user.created_at
return self.id
return self.id
@classmethod
def init_users(cls, context):
@ -80,13 +81,14 @@ class User(object):
tenants = k_client.tenants.list()
tenant_ids = [tenant.id for tenant in tenants]
users = self.load_all(context)
users = cls.load_all(context)
user_ids = [user.id for user in users]
for tid in tenant_ids:
if tid not in user_ids:
user = cls(tid, status=cls.INIT,
status_reason='Init from keystone')
status_reason='Init from keystone')
user.store(context)
return True
@classmethod
def _from_db_record(cls, record):
@ -111,12 +113,12 @@ class User(object):
@classmethod
def load(cls, context, user_id=None, user=None, realtime=False,
show_deleted=False, project_safe=True):
show_deleted=False, tenant_safe=True):
'''Retrieve a user from database.'''
if user is None:
user = db_api.user_get(context, user_id,
show_deleted=show_deleted,
project_safe=project_safe)
tenant_safe=tenant_safe)
if user is None:
raise exception.UserNotFound(user=user_id)
@ -127,19 +129,17 @@ class User(object):
seconds = (timeutils.utcnow() - u.last_bill).total_seconds()
u.balance -= u.rate * seconds
return u
@classmethod
def load_all(cls, context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None, project_safe=True):
filters=None):
'''Retrieve all users of from database.'''
records = db_api.user_get_all(context, show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters,
project_safe=project_safe)
filters=filters)
return [cls._from_db_record(record) for record in records]
@ -164,7 +164,6 @@ class User(object):
self.status = status
if reason:
self.status_reason = reason
#db_api.user_update(context, self.id, values)
def update_with_resource(self, context, resource, action='create'):
'''Update user with resource'''
@ -178,7 +177,7 @@ class User(object):
elif 'update' == action:
self.do_bill(context)
d_rate = resource.d_rate
self._change_user_rate(cnxt, d_rate)
self._change_user_rate(context, d_rate)
self.store(context)
def _change_user_rate(self, context, d_rate):
@ -186,15 +185,15 @@ class User(object):
old_rate = self.rate
new_rate = old_rate + d_rate
if old_rate == 0 and new_rate > 0:
self.last_bill = timeutils.utcnow()
self.last_bill = timeutils.utcnow()
if d_rate > 0 and self.status == self.FREE:
self.status = self.ACTIVE
elif d_rate < 0:
if new_rate == 0 and self.balance > 0:
self.status = self.FREE
self.status = self.FREE
elif self.status == self.WARNING:
prior_notify_time = cfg.CONF.bilean_task.prior_notify_time * 60
rest_usage = prior_notify_time * new_rate
p_time = cfg.CONF.bilean_task.prior_notify_time * 3600
rest_usage = p_time * new_rate
if self.balance > rest_usage:
self.status = self.ACTIVE
self.rate = new_rate
@ -207,15 +206,15 @@ class User(object):
if self.status == self.INIT and self.balance > 0:
self.set_status(self.ACTIVE, reason='Recharged')
elif self.status == self.FREEZE and self.balance > 0:
reason = "Status change from freeze to active because "
"of recharge."
reason = _("Status change from freeze to active because "
"of recharge.")
self.set_status(self.ACTIVE, reason=reason)
elif self.status == self.WARNING:
prior_notify_time = cfg.CONF.bilean_task.prior_notify_time * 60
prior_notify_time = cfg.CONF.bilean_task.prior_notify_time * 3600
rest_usage = prior_notify_time * self.rate
if self.balance > rest_usage:
reason = "Status change from warning to active because "
"of recharge."
reason = _("Status change from warning to active because "
"of recharge.")
self.set_status(self.ACTIVE, reason=reason)
event_mod.record(context, self.id, action='recharge', value=value)
self.store(context)

View File

@ -47,10 +47,10 @@ class Action(object):
def do_delete(self):
return NotImplemented
class ResourceAction(Action):
"""Notification controller for Resources."""
def __init__(self, cnxt, action, data):
super(ResourceAction, self).__init__(cnxt, action, data)

View File

@ -13,8 +13,9 @@
from bilean.common import context
from bilean.common.i18n import _
from bilean.notification import converter
from bilean.common.i18n import _LE
from bilean.notification import action as notify_action
from bilean.notification import converter
from oslo_log import log as logging
import oslo_messaging
@ -52,19 +53,18 @@ class EventsNotificationEndpoint(object):
"""Convert notifcation to user."""
user_id = notification['payload'].get('resource_info', None)
if not user_id:
LOG.error(_LE("Cannot retrieve user_id from notification: %s") %
notification)
LOG.error(_LE("Cannot retrieve user_id from notification: %s"),
notification)
return oslo_messaging.NotificationResult.HANDLED
action = self._get_action(notification['event_type'])
if action:
act = notify_action.UserAction(self.cnxt, action, user_id)
LOG.info(_("Notify engine to %(action)s user: %(user)s") %
{'action': action, 'user': user})
{'action': action, 'user': user_id})
act.execute()
return oslo_messaging.NotificationResult.HANDLED
def process_resource_notification(self, notification):
"""Convert notifcation to resources."""
resources = self.resource_converter.to_resources(notification)

View File

@ -41,7 +41,7 @@ class NotificationService(service.Service):
self.transport, self.targets, self.endpoints)
LOG.info(_("Starting listener on topic: %s"),
params.NOTIFICATION_TOPICS)
params.NOTIFICATION_TOPICS)
listener.start()
self.listeners.append(listener)

View File

@ -10,13 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_context import context as oslo_context
from oslo_log import log as logging
from oslo_utils import timeutils
from bilean.common import context
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common import schema
@ -115,28 +111,34 @@ class Rule(object):
return cls(record.name, record.spec, **kwargs)
@classmethod
def load(cls, ctx, rule_id=None, rule=None):
def load(cls, context, rule_id=None, rule=None, show_deleted=False):
'''Retrieve a rule object from database.'''
if rule is None:
rule = db_api.rule_get(ctx, rule_id)
rule = db_api.rule_get(context, rule_id,
show_deleted=show_deleted)
if rule is None:
raise exception.RuleNotFound(rule=rule_id)
return cls.from_db_record(rule)
@classmethod
def load_all(cls, ctx):
def load_all(cls, context, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
'''Retrieve all rules from database.'''
records = db_api.rule_get_all(ctx)
records = db_api.rule_get_all(context, show_deleted=show_deleted,
limit=limit, marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir,
filters=filters)
return [cls.from_db_record(record) for record in records)
return [cls.from_db_record(record) for record in records]
@classmethod
def delete(cls, ctx, rule_id):
db_api.rule_delete(ctx, rule_id)
def delete(cls, context, rule_id):
db_api.rule_delete(context, rule_id)
def store(self, ctx):
def store(self, context):
'''Store the rule into database and return its ID.'''
timestamp = timeutils.utcnow()
@ -150,11 +152,11 @@ class Rule(object):
if self.id:
self.updated_at = timestamp
values['updated_at'] = timestamp
db_api.rule_update(ctx, self.id, values)
db_api.rule_update(context, self.id, values)
else:
self.created_at = timestamp
values['created_at'] = timestamp
rule = db_api.rule_create(ctx, values)
rule = db_api.rule_create(context, values)
self.id = rule.id
return self.id

View File

@ -10,17 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
from oslo_log import log as logging
from oslo_utils import encodeutils
import six
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common import schema
from bilean.common import utils
from bilean.rules import base
LOG = logging.getLogger(__name__)
@ -74,7 +68,7 @@ class ServerRule(base.Rule):
'''Get the price of resource in seconds.
If no exact price found, it shows that rule of the server's flavor
has not been set, will return 0 as the price notify admin to set
has not been set, will return 0 as the price notify admin to set
it.
:param: resource: Resource object to find price.

View File

@ -57,7 +57,4 @@ if __name__ == '__main__':
srv = engine.EngineService(cfg.CONF.host, params.ENGINE_TOPIC)
launcher = service.launch(cfg.CONF, srv,
workers=cfg.CONF.num_engine_workers)
# We create the periodic tasks here, which mean they are created
# only in the parent process when num_engine_workers>1 is specified
srv.create_bilean_tasks()
launcher.wait()

View File

@ -10,7 +10,6 @@ setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
usedevelop = True
install_command = pip install {opts} {packages}
commands =
find . -type f -name "*.pyc" -delete