diff --git a/etc/monasca.conf b/etc/monasca.conf index 7e86b5fe2..a50ccd58d 100755 --- a/etc/monasca.conf +++ b/etc/monasca.conf @@ -10,6 +10,7 @@ region = useast # Dispatchers to be loaded to serve restful APIs dispatcher = v2_ref_metrics +dispatcher = v2_ref_stream_definitions dispatcher = v2_ref_alarms dispatcher = v2_ref_alarm_definitions dispatcher = v2_ref_events @@ -41,8 +42,11 @@ events_message_format = reference # The driver to use for the metrics repository metrics_driver = influxdb_metrics_repo +# The driver to use for the stream definitions repository +streams_driver = mysql_streams_repo + # The driver to use for the events repository -events_driver = none +events_driver = mysql_events_repo # The driver to use for the transforms repository transforms_driver = mysql_transforms_repo diff --git a/monasca/api/monasca_events_api_v2.py b/monasca/api/monasca_events_api_v2.py index f930ccf11..e742b4546 100644 --- a/monasca/api/monasca_events_api_v2.py +++ b/monasca/api/monasca_events_api_v2.py @@ -24,6 +24,14 @@ class EventsV2API(object): LOG.debug('initializing V2API!') self.global_conf = global_conf - @resource_api.Restify('/v2.0/events/', method='post') + @resource_api.Restify('/v2.0/events', method='post') def do_post_events(self, req, res): res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/events', method='get') + def do_get_events(self, req, res): + res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/events/{id}', method='get') + def do_get_event(self, req, res, id): + res.status = '501 Not Implemented' diff --git a/monasca/api/stream_definitions_api_v2.py b/monasca/api/stream_definitions_api_v2.py new file mode 100644 index 000000000..19c14654a --- /dev/null +++ b/monasca/api/stream_definitions_api_v2.py @@ -0,0 +1,42 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from monasca.common import resource_api +from monasca.openstack.common import log + +LOG = log.getLogger(__name__) + + +class StreamDefinitionsV2API(object): + + def __init__(self, global_conf): + + LOG.debug('initializing StreamDefinitionsV2API!') + self.global_conf = global_conf + + @resource_api.Restify('/v2.0/stream-definitions', method='post') + def do_post_stream_definitions(self, req, res): + res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/stream-definitions/{id}', method='get') + def do_get_stream_definition(self, req, res, id): + res.status = '501 Not Implemented' + + @resource_api.Restify('/v2.0/stream-definitions', method='get') + def do_get_stream_definitions(self, req, res): + res.status = '501 Not Implemented' + + @resource_api.Restify( + '/v2.0/stream-definitions/{id}', method='delete') + def do_delete_stream_definitions(self, req, res, id): + res.status = '501 Not Implemented' diff --git a/monasca/common/repositories/mysql/mysql_repository.py b/monasca/common/repositories/mysql/mysql_repository.py index 33f28901c..6241e9402 100644 --- a/monasca/common/repositories/mysql/mysql_repository.py +++ b/monasca/common/repositories/mysql/mysql_repository.py @@ -44,7 +44,7 @@ class MySQLRepository(object): cnxn = mdb.connect(self.database_server, self.database_uid, self.database_pwd, self.database_name, - use_unicode=True) + use_unicode=True, charset='utf8') cursor = cnxn.cursor(mdb.cursors.DictCursor) @@ -72,6 +72,8 @@ def mysql_try_catch_block(fun): raise except exceptions.InvalidUpdateException: raise + except exceptions.AlreadyExistsException: + raise except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) diff --git a/monasca/common/repositories/mysql/streams_repository.py b/monasca/common/repositories/mysql/streams_repository.py new file mode 100644 index 000000000..04b526693 --- /dev/null +++ b/monasca/common/repositories/mysql/streams_repository.py @@ -0,0 +1,200 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import uuid + +import MySQLdb +from oslo_utils import timeutils + +from monasca.common.repositories import constants +from monasca.common.repositories import exceptions +from monasca.common.repositories.mysql import mysql_repository +from monasca.common.repositories import streams_repository as sdr +from monasca.openstack.common import log + + +LOG = log.getLogger(__name__) + + +class StreamsRepository(mysql_repository.MySQLRepository, + sdr.StreamsRepository): + + base_query = """ + select sd.id, sd.tenant_id, sd.name, sd.description, + sd.select_by, sd.group_by, sd.fire_criteria, sd.expiration, + sd.actions_enabled, sd.created_at, + sd.updated_at, sd.deleted_at, + saf.fire_actions, sae.expire_actions + from stream_definition as sd + left join (select stream_definition_id, + group_concat(action_id) as fire_actions + from stream_actions + where action_type = 'FIRE' + group by stream_definition_id) as saf + on saf.stream_definition_id = sd.id + left join (select stream_definition_id, + group_concat(action_id) as expire_actions + from stream_actions + where action_type = 'EXPIRE' + group by stream_definition_id) as sae + on sae.stream_definition_id = sd.id + """ + + def __init__(self): + + super(StreamsRepository, self).__init__() + + @mysql_repository.mysql_try_catch_block + def get_stream_definition(self, tenant_id, stream_definition_id): + + parms = [tenant_id, stream_definition_id] + + where_clause = """ where sd.tenant_id = %s + and sd.id = %s + and deleted_at is NULL """ + + query = StreamsRepository.base_query + where_clause + + rows = self._execute_query(query, parms) + + if rows: + return rows[0] + else: + raise exceptions.DoesNotExistException + + @mysql_repository.mysql_try_catch_block + def get_stream_definitions(self, tenant_id, name, offset, limit): + + parms = [tenant_id] + + select_clause = StreamsRepository.base_query + + where_clause = " where sd.tenant_id = %s and deleted_at is NULL " + + if name: + where_clause += " and sd.name = %s " + parms.append(name.encode('utf8')) + + if offset is not None: + order_by_clause = " order by sd.id, sd.created_at " + where_clause += " and sd.id > %s " + parms.append(offset.encode('utf8')) + limit_clause = " limit %s " + parms.append(constants.PAGE_LIMIT) + else: + order_by_clause = " order by sd.created_at " + limit_clause = "" + + query = select_clause + where_clause + order_by_clause + limit_clause + + return self._execute_query(query, parms) + + @mysql_repository.mysql_try_catch_block + def delete_stream_definition(self, tenant_id, stream_definition_id): + """Delete the stream definition. + + :param tenant_id: + :param stream_definition_id: + :returns True: -- if stream definition exists and was deleted. + :returns False: -- if the stream definition does not exists. + :raises RepositoryException: + """ + + cnxn, cursor = self._get_cnxn_cursor_tuple() + + with cnxn: + + cursor.execute("""delete from stream_definition + where tenant_id = %s and id = %s""", + [tenant_id, stream_definition_id]) + + if cursor.rowcount < 1: + return False + + return True + + @mysql_repository.mysql_try_catch_block + def create_stream_definition(self, + tenant_id, + name, + description, + select, + group_by, + fire_criteria, + expiration, + fire_actions, + expire_actions): + cnxn, cursor = self._get_cnxn_cursor_tuple() + + with cnxn: + + now = timeutils.utcnow() + stream_definition_id = str(uuid.uuid1()) + try: + cursor.execute("""insert into stream_definition( + id, + tenant_id, + name, + description, + select_by, + group_by, + fire_criteria, + expiration, + created_at, + updated_at) + values (%s, %s, %s, %s, %s, %s, %s, %s, %s, + %s)""", ( + stream_definition_id, tenant_id, name.encode('utf8'), + description.encode('utf8'), select.encode('utf8'), + group_by.encode('utf8'), fire_criteria.encode('utf8'), + expiration, now, now)) + except MySQLdb.IntegrityError as e: + code, msg = e + if code == 1062: + raise exceptions.AlreadyExistsException( + 'Stream Definition already ' + 'exists for tenant_id: {0} name: {1}'.format( + tenant_id, name.encode('utf8'))) + else: + raise e + + self._insert_into_stream_actions(cursor, stream_definition_id, + fire_actions, u"FIRE") + self._insert_into_stream_actions(cursor, stream_definition_id, + expire_actions, + u"EXPIRE") + + return stream_definition_id + + def _insert_into_stream_actions(self, cursor, stream_definition_id, + actions, action_type): + + if actions is None: + return + + for action in actions: + cursor.execute("select id from notification_method where id = %s", + (action.encode('utf8'),)) + row = cursor.fetchone() + if not row: + raise exceptions.RepositoryException( + "Non-existent notification id {} submitted for {} " + "notification action".format(action.encode('utf8'), + action_type.encode('utf8'))) + cursor.execute("""insert into stream_actions( + stream_definition_id, + action_type, + action_id) + values(%s,%s,%s)""", ( + stream_definition_id, action_type.encode('utf8'), + action.encode('utf8'))) diff --git a/monasca/common/repositories/streams_repository.py b/monasca/common/repositories/streams_repository.py new file mode 100644 index 000000000..4bbc3ced8 --- /dev/null +++ b/monasca/common/repositories/streams_repository.py @@ -0,0 +1,48 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class StreamsRepository(object): + + def __init__(self): + super(StreamsRepository, self).__init__() + + @abc.abstractmethod + def create_stream_definition(self, + tenant_id, + name, + description, + select, + group_by, + fire_criteria, + expiration, + fire_actions, + expire_actions): + pass + + @abc.abstractmethod + def delete_stream_definition(self, tenant_id, stream_definition_id): + pass + + @abc.abstractmethod + def get_stream_definition(self, tenant_id, stream_definition_id): + pass + + @abc.abstractmethod + def get_stream_definitions(self, tenant_id, name, offset, limit): + pass diff --git a/monasca/v2/common/schemas/stream_definition_request_body_schema.py b/monasca/v2/common/schemas/stream_definition_request_body_schema.py new file mode 100644 index 000000000..24f3d5dce --- /dev/null +++ b/monasca/v2/common/schemas/stream_definition_request_body_schema.py @@ -0,0 +1,53 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import voluptuous + +from monasca.openstack.common import log +from monasca.v2.common.schemas import exceptions + + +LOG = log.getLogger(__name__) + +MILLISEC_PER_DAY = 86400000 +MILLISEC_PER_WEEK = MILLISEC_PER_DAY * 7 + +stream_definition_schema = { + voluptuous.Required('name'): voluptuous.All(voluptuous.Any(str, unicode), + voluptuous.Length(max=140)), + voluptuous.Required('select'): voluptuous.All( + voluptuous.Any(list)), + voluptuous.Required('group_by'): voluptuous.All( + voluptuous.Any(list)), + voluptuous.Required('fire_criteria'): voluptuous.All( + voluptuous.Any(list)), + voluptuous.Required('expiration'): voluptuous.All( + voluptuous.Any(int), voluptuous.Range(min=0, max=MILLISEC_PER_WEEK)), + + voluptuous.Optional('fire_actions'): voluptuous.All( + voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)), + voluptuous.Optional('expire_actions'): voluptuous.All( + voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)), + voluptuous.Optional('actions_enabled'): bool} + +request_body_schema = voluptuous.Schema(stream_definition_schema, + required=True, extra=True) + + +def validate(msg): + try: + request_body_schema(msg) + except Exception as ex: + LOG.debug(ex) + raise exceptions.ValidationException(str(ex)) diff --git a/monasca/v2/reference/__init__.py b/monasca/v2/reference/__init__.py index bc5d1b34f..8815cf10d 100755 --- a/monasca/v2/reference/__init__.py +++ b/monasca/v2/reference/__init__.py @@ -67,7 +67,9 @@ repositories_opts = [ help='The repository driver to use for alarm definitions'), cfg.StrOpt('alarms_driver', default='mysql_alarms_repo', help='The repository driver to use for alarms'), - cfg.StrOpt('events_driver', default='fake_events_repo', + cfg.StrOpt('streams_driver', default='mysql_streams_repo', + help='The repository driver to use for streams'), + cfg.StrOpt('events_driver', default='mysql_events_repo', help='The repository driver to use for events'), cfg.StrOpt('transforms_driver', default='mysql_transforms_repo', help='The repository driver to use for transforms'), diff --git a/monasca/v2/reference/events.py b/monasca/v2/reference/events.py index f753c76c1..2044096b9 100644 --- a/monasca/v2/reference/events.py +++ b/monasca/v2/reference/events.py @@ -128,7 +128,7 @@ class Events(monasca_events_api_v2.EventsV2API): return event_id, event_data - @resource_api.Restify('/v2.0/events/', method='post') + @resource_api.Restify('/v2.0/events', method='post') def do_post_events(self, req, res): helpers.validate_json_content_type(req) helpers.validate_authorization(req, self._post_events_authorized_roles) @@ -140,7 +140,7 @@ class Events(monasca_events_api_v2.EventsV2API): self._send_event(transformed_event) res.status = falcon.HTTP_204 - @resource_api.Restify('/v2.0/events/', method='get') + @resource_api.Restify('/v2.0/events', method='get') def do_get_events(self, req, res): helpers.validate_authorization(req, self._default_authorized_roles) tenant_id = helpers.get_tenant_id(req) @@ -152,10 +152,10 @@ class Events(monasca_events_api_v2.EventsV2API): res.body = helpers.dumpit_utf8(result) res.status = falcon.HTTP_200 - @resource_api.Restify('/v2.0/events/{event_id}', method='get') - def do_get_event(self, req, res, event_id): + @resource_api.Restify('/v2.0/events/{id}', method='get') + def do_get_event(self, req, res, id): helpers.validate_authorization(req, self._default_authorized_roles) tenant_id = helpers.get_tenant_id(req) - result = self._list_event(tenant_id, event_id, req.uri) + result = self._list_event(tenant_id, id, req.uri) res.body = helpers.dumpit_utf8(result) res.status = falcon.HTTP_200 diff --git a/monasca/v2/reference/stream_definitions.py b/monasca/v2/reference/stream_definitions.py new file mode 100644 index 000000000..d93f36cea --- /dev/null +++ b/monasca/v2/reference/stream_definitions.py @@ -0,0 +1,346 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import re + +import falcon +from oslo.config import cfg + +from monasca.api import stream_definitions_api_v2 +from monasca.common.messaging import exceptions as message_queue_exceptions +from monasca.common.repositories import exceptions +from monasca.common import resource_api +from monasca.openstack.common import log +from monasca.v2.common.schemas import (stream_definition_request_body_schema + as schema_streams) +from monasca.v2.common.schemas import exceptions as schemas_exceptions +from monasca.v2.reference import helpers +from monasca.v2.reference import resource + + +LOG = log.getLogger(__name__) + + +class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API): + + def __init__(self, global_conf): + + try: + + super(StreamDefinitions, self).__init__(global_conf) + + self._region = cfg.CONF.region + + self._default_authorized_roles = ( + cfg.CONF.security.default_authorized_roles) + self._delegate_authorized_roles = ( + cfg.CONF.security.delegate_authorized_roles) + self._post_authorized_roles = ( + cfg.CONF.security.default_authorized_roles + + cfg.CONF.security.agent_authorized_roles) + self._stream_definitions_repo = resource_api.init_driver( + 'monasca.repositories', cfg.CONF.repositories.streams_driver) + self.stream_definition_event_message_queue = ( + resource_api.init_driver('monasca.messaging', + cfg.CONF.messaging.driver, + (['stream-definitions']))) + except Exception as ex: + LOG.exception(ex) + raise exceptions.RepositoryException(ex) + + @resource_api.Restify('/v2.0/stream-definitions', method='post') + def do_post_stream_definitions(self, req, res): + helpers.validate_authorization(req, self._default_authorized_roles) + + stream_definition = helpers.read_json_msg_body(req) + + self._validate_stream_definition(stream_definition) + + tenant_id = helpers.get_tenant_id(req) + name = get_query_stream_definition_name(stream_definition) + description = get_query_stream_definition_description( + stream_definition) + select = stream_definition['select'] + group_by = stream_definition['group_by'] + fire_criteria = stream_definition['fire_criteria'] + expiration = stream_definition['expiration'] + fire_actions = get_query_stream_definition_fire_actions( + stream_definition) + expire_actions = get_query_stream_definition_expire_actions( + stream_definition) + + result = self._stream_definition_create(tenant_id, name, description, + select, group_by, + fire_criteria, expiration, + fire_actions, expire_actions) + + helpers.add_links_to_resource(result, req.uri) + res.body = helpers.dumpit_utf8(result) + res.status = falcon.HTTP_201 + + @resource_api.Restify('/v2.0/stream-definitions/{id}', method='get') + def do_get_stream_definition(self, req, res, id): + + helpers.validate_authorization(req, self._default_authorized_roles) + tenant_id = helpers.get_tenant_id(req) + + result = self._stream_definition_show(tenant_id, id) + + helpers.add_links_to_resource(result, re.sub('/' + id, '', req.uri)) + res.body = helpers.dumpit_utf8(result) + res.status = falcon.HTTP_200 + + @resource_api.Restify('/v2.0/stream-definitions', method='get') + def do_get_stream_definitions(self, req, res): + + helpers.validate_authorization(req, self._default_authorized_roles) + tenant_id = helpers.get_tenant_id(req) + name = helpers.get_query_name(req) + offset = helpers.normalize_offset(helpers.get_query_param(req, + 'offset')) + limit = helpers.get_query_param(req, 'limit') + result = self._stream_definition_list(tenant_id, name, + req.uri, offset, limit) + + res.body = helpers.dumpit_utf8(result) + res.status = falcon.HTTP_200 + + @resource_api.Restify( + '/v2.0/stream-definitions/{id}', method='delete') + def do_delete_stream_definitions(self, req, res, id): + + helpers.validate_authorization(req, self._default_authorized_roles) + tenant_id = helpers.get_tenant_id(req) + self._stream_definition_delete(tenant_id, id) + res.status = falcon.HTTP_204 + + @resource.resource_try_catch_block + def _stream_definition_delete(self, tenant_id, id): + + stream_definition_row = ( + self._stream_definitions_repo.get_stream_definition(tenant_id, id)) + + if not self._stream_definitions_repo.delete_stream_definition( + tenant_id, id): + raise falcon.HTTPNotFound + + self._send_stream_definition_deleted_event( + id, tenant_id, stream_definition_row['name']) + + def _validate_stream_definition(self, stream_definition): + + try: + schema_streams.validate(stream_definition) + except schemas_exceptions.ValidationException as ex: + LOG.debug(ex) + raise falcon.HTTPBadRequest('Bad request', ex.message) + + @resource.resource_try_catch_block + def _stream_definition_create(self, tenant_id, name, + description, select, group_by, + fire_criteria, expiration, + fire_actions, expire_actions): + + stream_definition_id = ( + self._stream_definitions_repo. + create_stream_definition(tenant_id, + name, + description, + json.dumps(select), + json.dumps(group_by), + json.dumps(fire_criteria), + expiration, + fire_actions, + expire_actions)) + + self._send_stream_definition_created_event(tenant_id, + stream_definition_id, + name, + select, + group_by, + fire_criteria, + expiration) + result = ( + {u'name': name, + u'id': stream_definition_id, + u'description': description, + u'select': select, + u'group_by': group_by, + u'fire_criteria': fire_criteria, + u'expiration': expiration, + u'fire_actions': fire_actions, + u'expire_actions': expire_actions, + u'actions_enabled': u'true'} + ) + + return result + + def send_event(self, message_queue, event_msg): + try: + message_queue.send_message( + helpers.dumpit_utf8(event_msg)) + except message_queue_exceptions.MessageQueueException as ex: + LOG.exception(ex) + raise falcon.HTTPInternalServerError( + 'Message queue service unavailable'.encode('utf8'), + ex.message.encode('utf8')) + + @resource.resource_try_catch_block + def _stream_definition_show(self, tenant_id, id): + + stream_definition_row = ( + self._stream_definitions_repo.get_stream_definition(tenant_id, id)) + + return self._build_stream_definition_show_result(stream_definition_row) + + @resource.resource_try_catch_block + def _stream_definition_list(self, tenant_id, name, req_uri, + offset, limit): + + stream_definition_rows = ( + self._stream_definitions_repo.get_stream_definitions( + tenant_id, name, offset, limit)) + result = [] + for stream_definition_row in stream_definition_rows: + sd = self._build_stream_definition_show_result( + stream_definition_row) + helpers.add_links_to_resource(sd, req_uri) + result.append(sd) + + result = helpers.paginate(result, req_uri, offset) + + return result + + def _build_stream_definition_show_result(self, stream_definition_row): + + fire_actions_list = get_comma_separated_str_as_list( + stream_definition_row['fire_actions']) + + expire_actions_list = get_comma_separated_str_as_list( + stream_definition_row['expire_actions']) + + result = ( + {u'name': stream_definition_row['name'], + u'id': stream_definition_row['id'], + u'description': stream_definition_row['description'], + u'select': json.loads(stream_definition_row['select_by']), + u'group_by': json.loads(stream_definition_row['group_by']), + u'fire_criteria': json.loads( + stream_definition_row['fire_criteria']), + u'expiration': stream_definition_row['expiration'], + u'fire_actions': fire_actions_list, + u'expire_actions': expire_actions_list, + u'actions_enabled': stream_definition_row['actions_enabled'] == 1, + u'created_at': stream_definition_row['created_at'].isoformat(), + u'updated_at': stream_definition_row['updated_at'].isoformat()} + ) + + return result + + def _send_stream_definition_deleted_event(self, stream_definition_id, + tenant_id, stream_name): + + stream_definition_deleted_event_msg = { + u"stream-definition-deleted": {u'tenant_id': tenant_id, + u'stream_definition_id': + stream_definition_id, + u'name': stream_name}} + + self.send_event(self.stream_definition_event_message_queue, + stream_definition_deleted_event_msg) + + def _send_stream_definition_created_event(self, tenant_id, + stream_definition_id, + name, + select, + group_by, + fire_criteria, + expiration): + + stream_definition_created_event_msg = { + u'stream-definition-created': {u'tenant_id': tenant_id, + u'stream_definition_id': + stream_definition_id, + u'name': name, + u'select': select, + u'group_by': group_by, + u'fire_criteria': fire_criteria, + u'expiration': expiration} + } + + self.send_event(self.stream_definition_event_message_queue, + stream_definition_created_event_msg) + + +def get_query_stream_definition_name(stream_definition): + return (stream_definition['name']) + + +def get_query_stream_definition_description(stream_definition, + return_none=False): + if 'description' in stream_definition: + return stream_definition['description'] + else: + if return_none: + return None + else: + return '' + + +def get_query_stream_definition_fire_actions(stream_definition, + return_none=False): + if 'fire_actions' in stream_definition: + return stream_definition['fire_actions'] + else: + if return_none: + return None + else: + return [] + + +def get_query_stream_definition_expire_actions(stream_definition, + return_none=False): + if 'expire_actions' in stream_definition: + return stream_definition['expire_actions'] + else: + if return_none: + return None + else: + return [] + + +def get_query_stream_definition_actions_enabled(stream_definition, + required=False, + return_none=False): + try: + if 'actions_enabled' in stream_definition: + return (stream_definition['actions_enabled']) + else: + if return_none: + return None + elif required: + raise Exception("Missing actions-enabled") + else: + return '' + except Exception as ex: + LOG.debug(ex) + raise falcon.HTTPBadRequest('Bad request', ex.message) + + +def get_comma_separated_str_as_list(comma_separated_str): + if not comma_separated_str: + return [] + else: + return comma_separated_str.decode('utf8').split(',') diff --git a/setup.cfg b/setup.cfg index d87002c1f..5a5a4b4f4 100755 --- a/setup.cfg +++ b/setup.cfg @@ -44,6 +44,7 @@ monasca.dispatcher = v2_ref_transforms = monasca.v2.reference.transforms:Transforms v2_ref_notifications = monasca.v2.reference.notifications:Notifications demo = monasca.v2.reference.demo_dispatcher:DemoDispatcher + v2_ref_stream_definitions = monasca.v2.reference.stream_definitions:StreamDefinitions paste.filter_factory = login = monasca.middleware.login:filter_factory @@ -65,6 +66,7 @@ monasca.repositories = mysql_alarms_repo = monasca.common.repositories.mysql.alarms_repository:AlarmsRepository mysql_notifications_repo = monasca.common.repositories.mysql.notifications_repository:NotificationsRepository mysql_events_repo = monasca.common.repositories.mysql.events_repository:EventsRepository + mysql_streams_repo = monasca.common.repositories.mysql.streams_repository:StreamsRepository [pbr] warnerrors = True