Merge pull request #14 from hpcloud-mon/feature/stream_definition_patching
Feature/stream definition patching
This commit is contained in:
commit
dba3061494
|
@ -32,3 +32,6 @@ class StreamDefinitionsV2API(object):
|
|||
|
||||
def on_delete(self, req, res, stream_id):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
def on_patch(self, req, res, stream_id):
|
||||
res.status = '501 Not Implemented'
|
||||
|
|
|
@ -206,6 +206,112 @@ class StreamsRepository(mysql_repository.MySQLRepository,
|
|||
|
||||
return stream_definition_id
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def patch_stream_definition(self, tenant_id, stream_definition_id, name, description, select, group_by,
|
||||
fire_criteria, expiration, fire_actions, expire_actions):
|
||||
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
|
||||
with cnxn:
|
||||
# Get the original alarm definition from the DB
|
||||
parms = [tenant_id, stream_definition_id]
|
||||
|
||||
where_clause = """ where sd.tenant_id = %s
|
||||
and sd.id = %s"""
|
||||
query = StreamsRepository.base_query + where_clause
|
||||
|
||||
cursor.execute(query, parms)
|
||||
|
||||
if cursor.rowcount < 1:
|
||||
raise exceptions.DoesNotExistException
|
||||
|
||||
original_definition = cursor.fetchall()[0]
|
||||
|
||||
# Update that stream definition in the database
|
||||
|
||||
patch_query = """
|
||||
update stream_definition
|
||||
set name = %s,
|
||||
description = %s,
|
||||
select_by = %s,
|
||||
group_by = %s,
|
||||
fire_criteria = %s,
|
||||
expiration = %s,
|
||||
updated_at = %s
|
||||
where tenant_id = %s and id = %s"""
|
||||
|
||||
if name is None:
|
||||
name = original_definition['name']
|
||||
|
||||
if description is None:
|
||||
description = original_definition['description']
|
||||
|
||||
if select is None:
|
||||
select = original_definition['select_by']
|
||||
|
||||
if select != original_definition['select_by']:
|
||||
msg = "select_by must not change".encode('utf8')
|
||||
raise exceptions.InvalidUpdateException(msg)
|
||||
|
||||
if group_by is None:
|
||||
group_by = original_definition['group_by']
|
||||
|
||||
if group_by != original_definition['group_by']:
|
||||
msg = "group_by must not change".encode('utf8')
|
||||
raise exceptions.InvalidUpdateException(msg)
|
||||
|
||||
if fire_criteria is None:
|
||||
fire_criteria = original_definition['fire_criteria']
|
||||
|
||||
if expiration is None:
|
||||
expiration = original_definition['expiration']
|
||||
|
||||
now = timeutils.utcnow()
|
||||
|
||||
update_parms = [
|
||||
name,
|
||||
description,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
now,
|
||||
tenant_id,
|
||||
stream_definition_id]
|
||||
|
||||
cursor.execute(patch_query, update_parms)
|
||||
|
||||
# Update the fire and expire actions in the database if defined
|
||||
|
||||
if fire_actions is not None:
|
||||
self._delete_stream_actions(cursor, stream_definition_id,
|
||||
u'FIRE')
|
||||
if expire_actions is not None:
|
||||
self._delete_stream_actions(cursor, stream_definition_id,
|
||||
u'EXPIRE')
|
||||
|
||||
self._insert_into_stream_actions(cursor, stream_definition_id,
|
||||
fire_actions,
|
||||
u"FIRE")
|
||||
self._insert_into_stream_actions(cursor, stream_definition_id,
|
||||
expire_actions,
|
||||
u"EXPIRE")
|
||||
|
||||
# Get updated entry from mysql
|
||||
cursor.execute(query, parms)
|
||||
|
||||
return cursor.fetchall()[0]
|
||||
|
||||
def _delete_stream_actions(self, cursor, stream_definition_id, action_type):
|
||||
|
||||
query = """
|
||||
delete
|
||||
from stream_actions
|
||||
where stream_definition_id = %s and action_type = %s
|
||||
"""
|
||||
parms = [stream_definition_id, action_type.encode('utf8')]
|
||||
cursor.execute(query, parms)
|
||||
|
||||
def _insert_into_stream_actions(self, cursor, stream_definition_id,
|
||||
actions, action_type):
|
||||
|
||||
|
|
|
@ -46,3 +46,17 @@ class StreamsRepository(object):
|
|||
@abc.abstractmethod
|
||||
def get_stream_definitions(self, tenant_id, name, offset, limit):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def patch_stream_definition(self,
|
||||
tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
description,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions):
|
||||
pass
|
||||
|
|
|
@ -119,6 +119,47 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
|||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
def on_patch(self, req, res, stream_id):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
|
||||
stream_definition = helpers.read_json_msg_body(req)
|
||||
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
|
||||
name = get_query_stream_definition_name(stream_definition, return_none=True)
|
||||
description = get_query_stream_definition_description(
|
||||
stream_definition, return_none=True)
|
||||
select = get_query_stream_definition_select(stream_definition, return_none=True)
|
||||
if select:
|
||||
for s in select:
|
||||
if 'traits' in s:
|
||||
s['traits']['_tenant_id'] = tenant_id
|
||||
else:
|
||||
s['traits'] = {'_tenant_id': tenant_id}
|
||||
|
||||
group_by = get_query_stream_definition_group_by(stream_definition, return_none=True)
|
||||
fire_criteria = get_query_stream_definition_fire_criteria(stream_definition, return_none=True)
|
||||
expiration = get_query_stream_definition_expiration(stream_definition, return_none=True)
|
||||
fire_actions = get_query_stream_definition_fire_actions(
|
||||
stream_definition, return_none=True)
|
||||
expire_actions = get_query_stream_definition_expire_actions(
|
||||
stream_definition, return_none=True)
|
||||
|
||||
result = self._stream_definition_patch(tenant_id,
|
||||
stream_id,
|
||||
name,
|
||||
description,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions)
|
||||
|
||||
helpers.add_links_to_resource(result, req.uri)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_201
|
||||
|
||||
def on_delete(self, req, res, stream_id):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
|
@ -196,6 +237,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
|||
|
||||
return result
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _stream_definition_patch(self, tenant_id, stream_definition_id, name,
|
||||
description, select, group_by,
|
||||
fire_criteria, expiration,
|
||||
fire_actions, expire_actions):
|
||||
|
||||
stream_definition_row = (
|
||||
self._stream_definitions_repo.patch_stream_definition(tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
description,
|
||||
None if select is None else json.dumps(select),
|
||||
None if group_by is None else json.dumps(group_by),
|
||||
None if fire_criteria is None else json.dumps(
|
||||
fire_criteria),
|
||||
expiration,
|
||||
fire_actions,
|
||||
expire_actions))
|
||||
|
||||
self._send_stream_definition_updated_event(tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration)
|
||||
|
||||
result = self._build_stream_definition_show_result(stream_definition_row)
|
||||
|
||||
return result
|
||||
|
||||
def send_event(self, message_queue, event_msg):
|
||||
try:
|
||||
message_queue.send_message(
|
||||
|
@ -300,9 +372,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
|||
self.send_event(self.stream_definition_event_message_queue,
|
||||
stream_definition_created_event_msg)
|
||||
|
||||
def _send_stream_definition_updated_event(self, tenant_id,
|
||||
stream_definition_id,
|
||||
name,
|
||||
select,
|
||||
group_by,
|
||||
fire_criteria,
|
||||
expiration):
|
||||
|
||||
def get_query_stream_definition_name(stream_definition):
|
||||
return (stream_definition['name'])
|
||||
stream_definition_created_event_msg = {
|
||||
u'stream-definition-updated': {u'tenant_id': tenant_id,
|
||||
u'stream_definition_id':
|
||||
stream_definition_id,
|
||||
u'name': name,
|
||||
u'select': select,
|
||||
u'group_by': group_by,
|
||||
u'fire_criteria': fire_criteria,
|
||||
u'expiration': expiration}
|
||||
}
|
||||
|
||||
self.send_event(self.stream_definition_event_message_queue,
|
||||
stream_definition_created_event_msg)
|
||||
|
||||
|
||||
def get_query_stream_definition_name(stream_definition, return_none=False):
|
||||
if 'name' in stream_definition:
|
||||
return stream_definition['name']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_description(stream_definition,
|
||||
|
@ -316,6 +416,50 @@ def get_query_stream_definition_description(stream_definition,
|
|||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_select(stream_definition,
|
||||
return_none=False):
|
||||
if 'select' in stream_definition:
|
||||
return stream_definition['select']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_group_by(stream_definition,
|
||||
return_none=False):
|
||||
if 'group_by' in stream_definition:
|
||||
return stream_definition['group_by']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def get_query_stream_definition_fire_criteria(stream_definition,
|
||||
return_none=False):
|
||||
if 'fire_criteria' in stream_definition:
|
||||
return stream_definition['fire_criteria']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_expiration(stream_definition,
|
||||
return_none=False):
|
||||
if 'expiration' in stream_definition:
|
||||
return stream_definition['expiration']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
def get_query_stream_definition_fire_actions(stream_definition,
|
||||
return_none=False):
|
||||
if 'fire_actions' in stream_definition:
|
||||
|
@ -343,7 +487,7 @@ def get_query_stream_definition_actions_enabled(stream_definition,
|
|||
return_none=False):
|
||||
try:
|
||||
if 'actions_enabled' in stream_definition:
|
||||
return (stream_definition['actions_enabled'])
|
||||
return stream_definition['actions_enabled']
|
||||
else:
|
||||
if return_none:
|
||||
return None
|
||||
|
|
Loading…
Reference in New Issue