streams definition POST, GET, GET (id), DELETE
This depends on ansible-monasca-schema changes here: https://github.com/hpcloud-mon/ansible-monasca-schema/pull/17 Here are sample curl commands for POST, GET, and DELETE: curl -i -X POST -H 'X-Auth-User: mini-mon' -H 'X-Auth-Token: 8c959d0296344c27a47b8e78dbf912ac' -H 'X-Auth-Key: password' -H 'Accept: application/json' -H 'User-Agent: python-monascaclient' -H 'Content-Tye: application/json' -d '{"fire_criteria": [{"event_type": "compute.instance.create.start"}, {"event_type": "compute.instance.create.end"}], "description": "provisioning duration", "name": "panda", "group_by": ["instance_id"], "expiration": 3, "select": [{"traits": {"tenant_id": "406904"}, "event_type": "compute.instance.create.*"}], "fire_actions": ["ed469bb9-2b4a-457a-9926-9da9f6ac75da"], "expire_actions":["ed469bb9-2b4a-457a-9926-9da9f6ac75da"]}' http://127.0.0.1:8080/v2.0/events/stream-definitions curl -i -X GET -H 'X-Auth-User: mini-mon' -H 'X-Auth-Token: 8c959d0296344c27a47b8e78dbf912ac' -H 'X-Auth-Key: password' -H 'Accept: application/json' -H 'User-Agent: python-monascaclient' -H 'Content-Type: application/json' http://127.0.0.1:8080/v2.0/events/stream-definitions/ curl -i -X DELETE -H 'X-Auth-User: mini-mon' -H 'X-Auth-Token: 8c959d0296344c27a47b8e78dbf912ac' -H 'X-Auth-Key: password' -H 'Accept: application/json' -H 'User-Agent: python-monascaclient' -H 'Content-Type: application/json' http://127.0.0.1:8080/v2.0/events/stream-definitions/86177f0e-f811-4c42-a91a-1813251bf93f Note: the limit parameter is passed into the streams_repository method for listing streams, but not yet used. We will open a separate Jira to to handle pagination with a user input limit parameter. Note: fixed a few events problems. stevedore wasn't loading the driver, and missing some abstact function defs. I tested stream-defintions and events, they both work now. Had to change the URI for stream-definitions because it conflicted with events, and now is more RESTful. Change-Id: I0b6dc385e1d095c1bd33867a038fe170ca277bfe
This commit is contained in:
parent
6b779a84b3
commit
f4f14c77fd
|
@ -10,6 +10,7 @@ region = useast
|
|||
|
||||
# Dispatchers to be loaded to serve restful APIs
|
||||
dispatcher = v2_ref_metrics
|
||||
dispatcher = v2_ref_stream_definitions
|
||||
dispatcher = v2_ref_alarms
|
||||
dispatcher = v2_ref_alarm_definitions
|
||||
dispatcher = v2_ref_events
|
||||
|
@ -41,8 +42,11 @@ events_message_format = reference
|
|||
# The driver to use for the metrics repository
|
||||
metrics_driver = influxdb_metrics_repo
|
||||
|
||||
# The driver to use for the stream definitions repository
|
||||
streams_driver = mysql_streams_repo
|
||||
|
||||
# The driver to use for the events repository
|
||||
events_driver = none
|
||||
events_driver = mysql_events_repo
|
||||
|
||||
# The driver to use for the transforms repository
|
||||
transforms_driver = mysql_transforms_repo
|
||||
|
|
|
@ -24,6 +24,14 @@ class EventsV2API(object):
|
|||
LOG.debug('initializing V2API!')
|
||||
self.global_conf = global_conf
|
||||
|
||||
@resource_api.Restify('/v2.0/events/', method='post')
|
||||
@resource_api.Restify('/v2.0/events', method='post')
|
||||
def do_post_events(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/events', method='get')
|
||||
def do_get_events(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/events/{id}', method='get')
|
||||
def do_get_event(self, req, res, id):
|
||||
res.status = '501 Not Implemented'
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
# Copyright 2015 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamDefinitionsV2API(object):
|
||||
|
||||
def __init__(self, global_conf):
|
||||
|
||||
LOG.debug('initializing StreamDefinitionsV2API!')
|
||||
self.global_conf = global_conf
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions', method='post')
|
||||
def do_post_stream_definitions(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions/{id}', method='get')
|
||||
def do_get_stream_definition(self, req, res, id):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions', method='get')
|
||||
def do_get_stream_definitions(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify(
|
||||
'/v2.0/stream-definitions/{id}', method='delete')
|
||||
def do_delete_stream_definitions(self, req, res, id):
|
||||
res.status = '501 Not Implemented'
|
|
@ -44,7 +44,7 @@ class MySQLRepository(object):
|
|||
|
||||
cnxn = mdb.connect(self.database_server, self.database_uid,
|
||||
self.database_pwd, self.database_name,
|
||||
use_unicode=True)
|
||||
use_unicode=True, charset='utf8')
|
||||
|
||||
cursor = cnxn.cursor(mdb.cursors.DictCursor)
|
||||
|
||||
|
@ -72,6 +72,8 @@ def mysql_try_catch_block(fun):
|
|||
raise
|
||||
except exceptions.InvalidUpdateException:
|
||||
raise
|
||||
except exceptions.AlreadyExistsException:
|
||||
raise
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
# Copyright 2015 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import uuid
|
||||
|
||||
import MySQLdb
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
from monasca.common.repositories import streams_repository as sdr
|
||||
from monasca.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamsRepository(mysql_repository.MySQLRepository,
|
||||
sdr.StreamsRepository):
|
||||
|
||||
base_query = """
|
||||
select sd.id, sd.tenant_id, sd.name, sd.description,
|
||||
sd.select_by, sd.group_by, sd.fire_criteria, sd.expiration,
|
||||
sd.actions_enabled, sd.created_at,
|
||||
sd.updated_at, sd.deleted_at,
|
||||
saf.fire_actions, sae.expire_actions
|
||||
from stream_definition as sd
|
||||
left join (select stream_definition_id,
|
||||
group_concat(action_id) as fire_actions
|
||||
from stream_actions
|
||||
where action_type = 'FIRE'
|
||||
group by stream_definition_id) as saf
|
||||
on saf.stream_definition_id = sd.id
|
||||
left join (select stream_definition_id,
|
||||
group_concat(action_id) as expire_actions
|
||||
from stream_actions
|
||||
where action_type = 'EXPIRE'
|
||||
group by stream_definition_id) as sae
|
||||
on sae.stream_definition_id = sd.id
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
|
||||
super(StreamsRepository, self).__init__()
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def get_stream_definition(self, tenant_id, stream_definition_id):
|
||||
|
||||
parms = [tenant_id, stream_definition_id]
|
||||
|
||||
where_clause = """ where sd.tenant_id = %s
|
||||
and sd.id = %s
|
||||
and deleted_at is NULL """
|
||||
|
||||
query = StreamsRepository.base_query + where_clause
|
||||
|
||||
rows = self._execute_query(query, parms)
|
||||
|
||||
if rows:
|
||||
return rows[0]
|
||||
else:
|
||||
raise exceptions.DoesNotExistException
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def get_stream_definitions(self, tenant_id, name, offset, limit):
|
||||
|
||||
parms = [tenant_id]
|
||||
|
||||
select_clause = StreamsRepository.base_query
|
||||
|
||||
where_clause = " where sd.tenant_id = %s and deleted_at is NULL "
|
||||
|
||||
if name:
|
||||
where_clause += " and sd.name = %s "
|
||||
parms.append(name.encode('utf8'))
|
||||
|
||||
if offset is not None:
|
||||
order_by_clause = " order by sd.id, sd.created_at "
|
||||
where_clause += " and sd.id > %s "
|
||||
parms.append(offset.encode('utf8'))
|
||||
limit_clause = " limit %s "
|
||||
parms.append(constants.PAGE_LIMIT)
|
||||
else:
|
||||
order_by_clause = " order by sd.created_at "
|
||||
limit_clause = ""
|
||||
|
||||
query = select_clause + where_clause + order_by_clause + limit_clause
|
||||
|
||||
return self._execute_query(query, parms)
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def delete_stream_definition(self, tenant_id, stream_definition_id):
|
||||
"""Delete the stream definition.
|
||||
|
||||
:param tenant_id:
|
||||
:param stream_definition_id:
|
||||
:returns True: -- if stream definition exists and was deleted.
|
||||
:returns False: -- if the stream definition does not exists.
|
||||
:raises RepositoryException:
|
||||
"""
|
||||
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
|
||||
with cnxn:
|
||||
|
||||
cursor.execute("""delete from stream_definition
|
||||
where tenant_id = %s and id = %s""",
|
||||
[tenant_id, stream_definition_id])
|
||||
|
||||
if cursor.rowcount < 1:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def create_stream_definition(self,
|
||||
tenant_id,
|
||||
name,
|
||||
description,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions):
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
|
||||
with cnxn:
|
||||
|
||||
now = timeutils.utcnow()
|
||||
stream_definition_id = str(uuid.uuid1())
|
||||
try:
|
||||
cursor.execute("""insert into stream_definition(
|
||||
id,
|
||||
tenant_id,
|
||||
name,
|
||||
description,
|
||||
select_by,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
created_at,
|
||||
updated_at)
|
||||
values (%s, %s, %s, %s, %s, %s, %s, %s, %s,
|
||||
%s)""", (
|
||||
stream_definition_id, tenant_id, name.encode('utf8'),
|
||||
description.encode('utf8'), select.encode('utf8'),
|
||||
group_by.encode('utf8'), fire_criteria.encode('utf8'),
|
||||
expiration, now, now))
|
||||
except MySQLdb.IntegrityError as e:
|
||||
code, msg = e
|
||||
if code == 1062:
|
||||
raise exceptions.AlreadyExistsException(
|
||||
'Stream Definition already '
|
||||
'exists for tenant_id: {0} name: {1}'.format(
|
||||
tenant_id, name.encode('utf8')))
|
||||
else:
|
||||
raise e
|
||||
|
||||
self._insert_into_stream_actions(cursor, stream_definition_id,
|
||||
fire_actions, u"FIRE")
|
||||
self._insert_into_stream_actions(cursor, stream_definition_id,
|
||||
expire_actions,
|
||||
u"EXPIRE")
|
||||
|
||||
return stream_definition_id
|
||||
|
||||
def _insert_into_stream_actions(self, cursor, stream_definition_id,
|
||||
actions, action_type):
|
||||
|
||||
if actions is None:
|
||||
return
|
||||
|
||||
for action in actions:
|
||||
cursor.execute("select id from notification_method where id = %s",
|
||||
(action.encode('utf8'),))
|
||||
row = cursor.fetchone()
|
||||
if not row:
|
||||
raise exceptions.RepositoryException(
|
||||
"Non-existent notification id {} submitted for {} "
|
||||
"notification action".format(action.encode('utf8'),
|
||||
action_type.encode('utf8')))
|
||||
cursor.execute("""insert into stream_actions(
|
||||
stream_definition_id,
|
||||
action_type,
|
||||
action_id)
|
||||
values(%s,%s,%s)""", (
|
||||
stream_definition_id, action_type.encode('utf8'),
|
||||
action.encode('utf8')))
|
|
@ -0,0 +1,48 @@
|
|||
# Copyright 2015 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class StreamsRepository(object):
|
||||
|
||||
def __init__(self):
|
||||
super(StreamsRepository, self).__init__()
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_stream_definition(self,
|
||||
tenant_id,
|
||||
name,
|
||||
description,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_stream_definition(self, tenant_id, stream_definition_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_stream_definition(self, tenant_id, stream_definition_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_stream_definitions(self, tenant_id, name, offset, limit):
|
||||
pass
|
|
@ -0,0 +1,53 @@
|
|||
# Copyright 2015 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import voluptuous
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MILLISEC_PER_DAY = 86400000
|
||||
MILLISEC_PER_WEEK = MILLISEC_PER_DAY * 7
|
||||
|
||||
stream_definition_schema = {
|
||||
voluptuous.Required('name'): voluptuous.All(voluptuous.Any(str, unicode),
|
||||
voluptuous.Length(max=140)),
|
||||
voluptuous.Required('select'): voluptuous.All(
|
||||
voluptuous.Any(list)),
|
||||
voluptuous.Required('group_by'): voluptuous.All(
|
||||
voluptuous.Any(list)),
|
||||
voluptuous.Required('fire_criteria'): voluptuous.All(
|
||||
voluptuous.Any(list)),
|
||||
voluptuous.Required('expiration'): voluptuous.All(
|
||||
voluptuous.Any(int), voluptuous.Range(min=0, max=MILLISEC_PER_WEEK)),
|
||||
|
||||
voluptuous.Optional('fire_actions'): voluptuous.All(
|
||||
voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)),
|
||||
voluptuous.Optional('expire_actions'): voluptuous.All(
|
||||
voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)),
|
||||
voluptuous.Optional('actions_enabled'): bool}
|
||||
|
||||
request_body_schema = voluptuous.Schema(stream_definition_schema,
|
||||
required=True, extra=True)
|
||||
|
||||
|
||||
def validate(msg):
|
||||
try:
|
||||
request_body_schema(msg)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -67,7 +67,9 @@ repositories_opts = [
|
|||
help='The repository driver to use for alarm definitions'),
|
||||
cfg.StrOpt('alarms_driver', default='mysql_alarms_repo',
|
||||
help='The repository driver to use for alarms'),
|
||||
cfg.StrOpt('events_driver', default='fake_events_repo',
|
||||
cfg.StrOpt('streams_driver', default='mysql_streams_repo',
|
||||
help='The repository driver to use for streams'),
|
||||
cfg.StrOpt('events_driver', default='mysql_events_repo',
|
||||
help='The repository driver to use for events'),
|
||||
cfg.StrOpt('transforms_driver', default='mysql_transforms_repo',
|
||||
help='The repository driver to use for transforms'),
|
||||
|
|
|
@ -128,7 +128,7 @@ class Events(monasca_events_api_v2.EventsV2API):
|
|||
|
||||
return event_id, event_data
|
||||
|
||||
@resource_api.Restify('/v2.0/events/', method='post')
|
||||
@resource_api.Restify('/v2.0/events', method='post')
|
||||
def do_post_events(self, req, res):
|
||||
helpers.validate_json_content_type(req)
|
||||
helpers.validate_authorization(req, self._post_events_authorized_roles)
|
||||
|
@ -140,7 +140,7 @@ class Events(monasca_events_api_v2.EventsV2API):
|
|||
self._send_event(transformed_event)
|
||||
res.status = falcon.HTTP_204
|
||||
|
||||
@resource_api.Restify('/v2.0/events/', method='get')
|
||||
@resource_api.Restify('/v2.0/events', method='get')
|
||||
def do_get_events(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
|
@ -152,10 +152,10 @@ class Events(monasca_events_api_v2.EventsV2API):
|
|||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@resource_api.Restify('/v2.0/events/{event_id}', method='get')
|
||||
def do_get_event(self, req, res, event_id):
|
||||
@resource_api.Restify('/v2.0/events/{id}', method='get')
|
||||
def do_get_event(self, req, res, id):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
result = self._list_event(tenant_id, event_id, req.uri)
|
||||
result = self._list_event(tenant_id, id, req.uri)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
|
|
@ -0,0 +1,346 @@
|
|||
# Copyright 2015 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import re
|
||||
|
||||
import falcon
|
||||
from oslo.config import cfg
|
||||
|
||||
from monasca.api import stream_definitions_api_v2
|
||||
from monasca.common.messaging import exceptions as message_queue_exceptions
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import (stream_definition_request_body_schema
|
||||
as schema_streams)
|
||||
from monasca.v2.common.schemas import exceptions as schemas_exceptions
|
||||
from monasca.v2.reference import helpers
|
||||
from monasca.v2.reference import resource
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
||||
|
||||
def __init__(self, global_conf):
|
||||
|
||||
try:
|
||||
|
||||
super(StreamDefinitions, self).__init__(global_conf)
|
||||
|
||||
self._region = cfg.CONF.region
|
||||
|
||||
self._default_authorized_roles = (
|
||||
cfg.CONF.security.default_authorized_roles)
|
||||
self._delegate_authorized_roles = (
|
||||
cfg.CONF.security.delegate_authorized_roles)
|
||||
self._post_authorized_roles = (
|
||||
cfg.CONF.security.default_authorized_roles +
|
||||
cfg.CONF.security.agent_authorized_roles)
|
||||
self._stream_definitions_repo = resource_api.init_driver(
|
||||
'monasca.repositories', cfg.CONF.repositories.streams_driver)
|
||||
self.stream_definition_event_message_queue = (
|
||||
resource_api.init_driver('monasca.messaging',
|
||||
cfg.CONF.messaging.driver,
|
||||
(['stream-definitions'])))
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions', method='post')
|
||||
def do_post_stream_definitions(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
|
||||
stream_definition = helpers.read_json_msg_body(req)
|
||||
|
||||
self._validate_stream_definition(stream_definition)
|
||||
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = get_query_stream_definition_name(stream_definition)
|
||||
description = get_query_stream_definition_description(
|
||||
stream_definition)
|
||||
select = stream_definition['select']
|
||||
group_by = stream_definition['group_by']
|
||||
fire_criteria = stream_definition['fire_criteria']
|
||||
expiration = stream_definition['expiration']
|
||||
fire_actions = get_query_stream_definition_fire_actions(
|
||||
stream_definition)
|
||||
expire_actions = get_query_stream_definition_expire_actions(
|
||||
stream_definition)
|
||||
|
||||
result = self._stream_definition_create(tenant_id, name, description,
|
||||
select, group_by,
|
||||
fire_criteria, expiration,
|
||||
fire_actions, expire_actions)
|
||||
|
||||
helpers.add_links_to_resource(result, req.uri)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_201
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions/{id}', method='get')
|
||||
def do_get_stream_definition(self, req, res, id):
|
||||
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
|
||||
result = self._stream_definition_show(tenant_id, id)
|
||||
|
||||
helpers.add_links_to_resource(result, re.sub('/' + id, '', req.uri))
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@resource_api.Restify('/v2.0/stream-definitions', method='get')
|
||||
def do_get_stream_definitions(self, req, res):
|
||||
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
limit = helpers.get_query_param(req, 'limit')
|
||||
result = self._stream_definition_list(tenant_id, name,
|
||||
req.uri, offset, limit)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@resource_api.Restify(
|
||||
'/v2.0/stream-definitions/{id}', method='delete')
|
||||
def do_delete_stream_definitions(self, req, res, id):
|
||||
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
self._stream_definition_delete(tenant_id, id)
|
||||
res.status = falcon.HTTP_204
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _stream_definition_delete(self, tenant_id, id):
|
||||
|
||||
stream_definition_row = (
|
||||
self._stream_definitions_repo.get_stream_definition(tenant_id, id))
|
||||
|
||||
if not self._stream_definitions_repo.delete_stream_definition(
|
||||
tenant_id, id):
|
||||
raise falcon.HTTPNotFound
|
||||
|
||||
self._send_stream_definition_deleted_event(
|
||||
id, tenant_id, stream_definition_row['name'])
|
||||
|
||||
def _validate_stream_definition(self, stream_definition):
|
||||
|
||||
try:
|
||||
schema_streams.validate(stream_definition)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _stream_definition_create(self, tenant_id, name,
|
||||
description, select, group_by,
|
||||
fire_criteria, expiration,
|
||||
fire_actions, expire_actions):
|
||||
|
||||
stream_definition_id = (
|
||||
self._stream_definitions_repo.
|
||||
create_stream_definition(tenant_id,
|
||||
name,
|
||||
description,
|
||||
json.dumps(select),
|
||||
json.dumps(group_by),
|
||||
json.dumps(fire_criteria),
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions))
|
||||
|
||||
self._send_stream_definition_created_event(tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration)
|
||||
result = (
|
||||
{u'name': name,
|
||||
u'id': stream_definition_id,
|
||||
u'description': description,
|
||||
u'select': select,
|
||||
u'group_by': group_by,
|
||||
u'fire_criteria': fire_criteria,
|
||||
u'expiration': expiration,
|
||||
u'fire_actions': fire_actions,
|
||||
u'expire_actions': expire_actions,
|
||||
u'actions_enabled': u'true'}
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def send_event(self, message_queue, event_msg):
|
||||
try:
|
||||
message_queue.send_message(
|
||||
helpers.dumpit_utf8(event_msg))
|
||||
except message_queue_exceptions.MessageQueueException as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPInternalServerError(
|
||||
'Message queue service unavailable'.encode('utf8'),
|
||||
ex.message.encode('utf8'))
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _stream_definition_show(self, tenant_id, id):
|
||||
|
||||
stream_definition_row = (
|
||||
self._stream_definitions_repo.get_stream_definition(tenant_id, id))
|
||||
|
||||
return self._build_stream_definition_show_result(stream_definition_row)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _stream_definition_list(self, tenant_id, name, req_uri,
|
||||
offset, limit):
|
||||
|
||||
stream_definition_rows = (
|
||||
self._stream_definitions_repo.get_stream_definitions(
|
||||
tenant_id, name, offset, limit))
|
||||
result = []
|
||||
for stream_definition_row in stream_definition_rows:
|
||||
sd = self._build_stream_definition_show_result(
|
||||
stream_definition_row)
|
||||
helpers.add_links_to_resource(sd, req_uri)
|
||||
result.append(sd)
|
||||
|
||||
result = helpers.paginate(result, req_uri, offset)
|
||||
|
||||
return result
|
||||
|
||||
def _build_stream_definition_show_result(self, stream_definition_row):
|
||||
|
||||
fire_actions_list = get_comma_separated_str_as_list(
|
||||
stream_definition_row['fire_actions'])
|
||||
|
||||
expire_actions_list = get_comma_separated_str_as_list(
|
||||
stream_definition_row['expire_actions'])
|
||||
|
||||
result = (
|
||||
{u'name': stream_definition_row['name'],
|
||||
u'id': stream_definition_row['id'],
|
||||
u'description': stream_definition_row['description'],
|
||||
u'select': json.loads(stream_definition_row['select_by']),
|
||||
u'group_by': json.loads(stream_definition_row['group_by']),
|
||||
u'fire_criteria': json.loads(
|
||||
stream_definition_row['fire_criteria']),
|
||||
u'expiration': stream_definition_row['expiration'],
|
||||
u'fire_actions': fire_actions_list,
|
||||
u'expire_actions': expire_actions_list,
|
||||
u'actions_enabled': stream_definition_row['actions_enabled'] == 1,
|
||||
u'created_at': stream_definition_row['created_at'].isoformat(),
|
||||
u'updated_at': stream_definition_row['updated_at'].isoformat()}
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _send_stream_definition_deleted_event(self, stream_definition_id,
|
||||
tenant_id, stream_name):
|
||||
|
||||
stream_definition_deleted_event_msg = {
|
||||
u"stream-definition-deleted": {u'tenant_id': tenant_id,
|
||||
u'stream_definition_id':
|
||||
stream_definition_id,
|
||||
u'name': stream_name}}
|
||||
|
||||
self.send_event(self.stream_definition_event_message_queue,
|
||||
stream_definition_deleted_event_msg)
|
||||
|
||||
def _send_stream_definition_created_event(self, tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration):
|
||||
|
||||
stream_definition_created_event_msg = {
|
||||
u'stream-definition-created': {u'tenant_id': tenant_id,
|
||||
u'stream_definition_id':
|
||||
stream_definition_id,
|
||||
u'name': name,
|
||||
u'select': select,
|
||||
u'group_by': group_by,
|
||||
u'fire_criteria': fire_criteria,
|
||||
u'expiration': expiration}
|
||||
}
|
||||
|
||||
self.send_event(self.stream_definition_event_message_queue,
|
||||
stream_definition_created_event_msg)
|
||||
|
||||
|
||||
def get_query_stream_definition_name(stream_definition):
|
||||
return (stream_definition['name'])
|
||||
|
||||
|
||||
def get_query_stream_definition_description(stream_definition,
|
||||
return_none=False):
|
||||
if 'description' in stream_definition:
|
||||
return stream_definition['description']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_fire_actions(stream_definition,
|
||||
return_none=False):
|
||||
if 'fire_actions' in stream_definition:
|
||||
return stream_definition['fire_actions']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def get_query_stream_definition_expire_actions(stream_definition,
|
||||
return_none=False):
|
||||
if 'expire_actions' in stream_definition:
|
||||
return stream_definition['expire_actions']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def get_query_stream_definition_actions_enabled(stream_definition,
|
||||
required=False,
|
||||
return_none=False):
|
||||
try:
|
||||
if 'actions_enabled' in stream_definition:
|
||||
return (stream_definition['actions_enabled'])
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
elif required:
|
||||
raise Exception("Missing actions-enabled")
|
||||
else:
|
||||
return ''
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def get_comma_separated_str_as_list(comma_separated_str):
|
||||
if not comma_separated_str:
|
||||
return []
|
||||
else:
|
||||
return comma_separated_str.decode('utf8').split(',')
|
|
@ -44,6 +44,7 @@ monasca.dispatcher =
|
|||
v2_ref_transforms = monasca.v2.reference.transforms:Transforms
|
||||
v2_ref_notifications = monasca.v2.reference.notifications:Notifications
|
||||
demo = monasca.v2.reference.demo_dispatcher:DemoDispatcher
|
||||
v2_ref_stream_definitions = monasca.v2.reference.stream_definitions:StreamDefinitions
|
||||
|
||||
paste.filter_factory =
|
||||
login = monasca.middleware.login:filter_factory
|
||||
|
@ -65,6 +66,7 @@ monasca.repositories =
|
|||
mysql_alarms_repo = monasca.common.repositories.mysql.alarms_repository:AlarmsRepository
|
||||
mysql_notifications_repo = monasca.common.repositories.mysql.notifications_repository:NotificationsRepository
|
||||
mysql_events_repo = monasca.common.repositories.mysql.events_repository:EventsRepository
|
||||
mysql_streams_repo = monasca.common.repositories.mysql.streams_repository:StreamsRepository
|
||||
|
||||
[pbr]
|
||||
warnerrors = True
|
||||
|
|
Loading…
Reference in New Issue