Merge pull request #14 from hpcloud-mon/feature/stream_definition_patching

Feature/stream definition patching
This commit is contained in:
Joe Keen 2015-11-20 13:32:30 -07:00
commit dba3061494
4 changed files with 270 additions and 3 deletions

View File

@ -32,3 +32,6 @@ class StreamDefinitionsV2API(object):
def on_delete(self, req, res, stream_id):
res.status = '501 Not Implemented'
def on_patch(self, req, res, stream_id):
res.status = '501 Not Implemented'

View File

@ -206,6 +206,112 @@ class StreamsRepository(mysql_repository.MySQLRepository,
return stream_definition_id
@mysql_repository.mysql_try_catch_block
def patch_stream_definition(self, tenant_id, stream_definition_id, name, description, select, group_by,
fire_criteria, expiration, fire_actions, expire_actions):
cnxn, cursor = self._get_cnxn_cursor_tuple()
with cnxn:
# Get the original alarm definition from the DB
parms = [tenant_id, stream_definition_id]
where_clause = """ where sd.tenant_id = %s
and sd.id = %s"""
query = StreamsRepository.base_query + where_clause
cursor.execute(query, parms)
if cursor.rowcount < 1:
raise exceptions.DoesNotExistException
original_definition = cursor.fetchall()[0]
# Update that stream definition in the database
patch_query = """
update stream_definition
set name = %s,
description = %s,
select_by = %s,
group_by = %s,
fire_criteria = %s,
expiration = %s,
updated_at = %s
where tenant_id = %s and id = %s"""
if name is None:
name = original_definition['name']
if description is None:
description = original_definition['description']
if select is None:
select = original_definition['select_by']
if select != original_definition['select_by']:
msg = "select_by must not change".encode('utf8')
raise exceptions.InvalidUpdateException(msg)
if group_by is None:
group_by = original_definition['group_by']
if group_by != original_definition['group_by']:
msg = "group_by must not change".encode('utf8')
raise exceptions.InvalidUpdateException(msg)
if fire_criteria is None:
fire_criteria = original_definition['fire_criteria']
if expiration is None:
expiration = original_definition['expiration']
now = timeutils.utcnow()
update_parms = [
name,
description,
select,
group_by,
fire_criteria,
expiration,
now,
tenant_id,
stream_definition_id]
cursor.execute(patch_query, update_parms)
# Update the fire and expire actions in the database if defined
if fire_actions is not None:
self._delete_stream_actions(cursor, stream_definition_id,
u'FIRE')
if expire_actions is not None:
self._delete_stream_actions(cursor, stream_definition_id,
u'EXPIRE')
self._insert_into_stream_actions(cursor, stream_definition_id,
fire_actions,
u"FIRE")
self._insert_into_stream_actions(cursor, stream_definition_id,
expire_actions,
u"EXPIRE")
# Get updated entry from mysql
cursor.execute(query, parms)
return cursor.fetchall()[0]
def _delete_stream_actions(self, cursor, stream_definition_id, action_type):
query = """
delete
from stream_actions
where stream_definition_id = %s and action_type = %s
"""
parms = [stream_definition_id, action_type.encode('utf8')]
cursor.execute(query, parms)
def _insert_into_stream_actions(self, cursor, stream_definition_id,
actions, action_type):

View File

@ -46,3 +46,17 @@ class StreamsRepository(object):
@abc.abstractmethod
def get_stream_definitions(self, tenant_id, name, offset, limit):
pass
@abc.abstractmethod
def patch_stream_definition(self,
tenant_id,
stream_definition_id,
name,
description,
select,
group_by,
fire_criteria,
expiration,
fire_actions,
expire_actions):
pass

View File

@ -119,6 +119,47 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
def on_patch(self, req, res, stream_id):
helpers.validate_authorization(req, self._default_authorized_roles)
stream_definition = helpers.read_json_msg_body(req)
tenant_id = helpers.get_tenant_id(req)
name = get_query_stream_definition_name(stream_definition, return_none=True)
description = get_query_stream_definition_description(
stream_definition, return_none=True)
select = get_query_stream_definition_select(stream_definition, return_none=True)
if select:
for s in select:
if 'traits' in s:
s['traits']['_tenant_id'] = tenant_id
else:
s['traits'] = {'_tenant_id': tenant_id}
group_by = get_query_stream_definition_group_by(stream_definition, return_none=True)
fire_criteria = get_query_stream_definition_fire_criteria(stream_definition, return_none=True)
expiration = get_query_stream_definition_expiration(stream_definition, return_none=True)
fire_actions = get_query_stream_definition_fire_actions(
stream_definition, return_none=True)
expire_actions = get_query_stream_definition_expire_actions(
stream_definition, return_none=True)
result = self._stream_definition_patch(tenant_id,
stream_id,
name,
description,
select,
group_by,
fire_criteria,
expiration,
fire_actions,
expire_actions)
helpers.add_links_to_resource(result, req.uri)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_201
def on_delete(self, req, res, stream_id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
@ -196,6 +237,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
return result
@resource.resource_try_catch_block
def _stream_definition_patch(self, tenant_id, stream_definition_id, name,
description, select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions):
stream_definition_row = (
self._stream_definitions_repo.patch_stream_definition(tenant_id,
stream_definition_id,
name,
description,
None if select is None else json.dumps(select),
None if group_by is None else json.dumps(group_by),
None if fire_criteria is None else json.dumps(
fire_criteria),
expiration,
fire_actions,
expire_actions))
self._send_stream_definition_updated_event(tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration)
result = self._build_stream_definition_show_result(stream_definition_row)
return result
def send_event(self, message_queue, event_msg):
try:
message_queue.send_message(
@ -300,9 +372,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
self.send_event(self.stream_definition_event_message_queue,
stream_definition_created_event_msg)
def _send_stream_definition_updated_event(self, tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration):
def get_query_stream_definition_name(stream_definition):
return (stream_definition['name'])
stream_definition_created_event_msg = {
u'stream-definition-updated': {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': name,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration}
}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_created_event_msg)
def get_query_stream_definition_name(stream_definition, return_none=False):
if 'name' in stream_definition:
return stream_definition['name']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_description(stream_definition,
@ -316,6 +416,50 @@ def get_query_stream_definition_description(stream_definition,
return ''
def get_query_stream_definition_select(stream_definition,
return_none=False):
if 'select' in stream_definition:
return stream_definition['select']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_group_by(stream_definition,
return_none=False):
if 'group_by' in stream_definition:
return stream_definition['group_by']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_fire_criteria(stream_definition,
return_none=False):
if 'fire_criteria' in stream_definition:
return stream_definition['fire_criteria']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_expiration(stream_definition,
return_none=False):
if 'expiration' in stream_definition:
return stream_definition['expiration']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_fire_actions(stream_definition,
return_none=False):
if 'fire_actions' in stream_definition:
@ -343,7 +487,7 @@ def get_query_stream_definition_actions_enabled(stream_definition,
return_none=False):
try:
if 'actions_enabled' in stream_definition:
return (stream_definition['actions_enabled'])
return stream_definition['actions_enabled']
else:
if return_none:
return None