summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Keen <joe.keen@hp.com>2015-11-20 13:32:30 -0700
committerJoe Keen <joe.keen@hp.com>2015-11-20 13:32:30 -0700
commitdba30614943d6a003620336540a2774e238a567b (patch)
tree8ddca4088c10dadd09ef6d767c91a8dd6fa7b2b8
parent3829d1287d80194d4f911f24dacc5edec86fc5d7 (diff)
parentc00fd08178553b01b0bcda7bce2ffa9c0e789731 (diff)
Merge pull request #14 from hpcloud-mon/feature/stream_definition_patching
Feature/stream definition patching
-rw-r--r--monasca_events_api/api/stream_definitions_api_v2.py3
-rw-r--r--monasca_events_api/common/repositories/mysql/streams_repository.py106
-rw-r--r--monasca_events_api/common/repositories/streams_repository.py14
-rw-r--r--monasca_events_api/v2/stream_definitions.py150
4 files changed, 270 insertions, 3 deletions
diff --git a/monasca_events_api/api/stream_definitions_api_v2.py b/monasca_events_api/api/stream_definitions_api_v2.py
index 5e2e3e7..6b3e965 100644
--- a/monasca_events_api/api/stream_definitions_api_v2.py
+++ b/monasca_events_api/api/stream_definitions_api_v2.py
@@ -32,3 +32,6 @@ class StreamDefinitionsV2API(object):
32 32
33 def on_delete(self, req, res, stream_id): 33 def on_delete(self, req, res, stream_id):
34 res.status = '501 Not Implemented' 34 res.status = '501 Not Implemented'
35
36 def on_patch(self, req, res, stream_id):
37 res.status = '501 Not Implemented'
diff --git a/monasca_events_api/common/repositories/mysql/streams_repository.py b/monasca_events_api/common/repositories/mysql/streams_repository.py
index dfac70f..836bcd9 100644
--- a/monasca_events_api/common/repositories/mysql/streams_repository.py
+++ b/monasca_events_api/common/repositories/mysql/streams_repository.py
@@ -206,6 +206,112 @@ class StreamsRepository(mysql_repository.MySQLRepository,
206 206
207 return stream_definition_id 207 return stream_definition_id
208 208
209 @mysql_repository.mysql_try_catch_block
210 def patch_stream_definition(self, tenant_id, stream_definition_id, name, description, select, group_by,
211 fire_criteria, expiration, fire_actions, expire_actions):
212
213 cnxn, cursor = self._get_cnxn_cursor_tuple()
214
215 with cnxn:
216 # Get the original alarm definition from the DB
217 parms = [tenant_id, stream_definition_id]
218
219 where_clause = """ where sd.tenant_id = %s
220 and sd.id = %s"""
221 query = StreamsRepository.base_query + where_clause
222
223 cursor.execute(query, parms)
224
225 if cursor.rowcount < 1:
226 raise exceptions.DoesNotExistException
227
228 original_definition = cursor.fetchall()[0]
229
230 # Update that stream definition in the database
231
232 patch_query = """
233 update stream_definition
234 set name = %s,
235 description = %s,
236 select_by = %s,
237 group_by = %s,
238 fire_criteria = %s,
239 expiration = %s,
240 updated_at = %s
241 where tenant_id = %s and id = %s"""
242
243 if name is None:
244 name = original_definition['name']
245
246 if description is None:
247 description = original_definition['description']
248
249 if select is None:
250 select = original_definition['select_by']
251
252 if select != original_definition['select_by']:
253 msg = "select_by must not change".encode('utf8')
254 raise exceptions.InvalidUpdateException(msg)
255
256 if group_by is None:
257 group_by = original_definition['group_by']
258
259 if group_by != original_definition['group_by']:
260 msg = "group_by must not change".encode('utf8')
261 raise exceptions.InvalidUpdateException(msg)
262
263 if fire_criteria is None:
264 fire_criteria = original_definition['fire_criteria']
265
266 if expiration is None:
267 expiration = original_definition['expiration']
268
269 now = timeutils.utcnow()
270
271 update_parms = [
272 name,
273 description,
274 select,
275 group_by,
276 fire_criteria,
277 expiration,
278 now,
279 tenant_id,
280 stream_definition_id]
281
282 cursor.execute(patch_query, update_parms)
283
284 # Update the fire and expire actions in the database if defined
285
286 if fire_actions is not None:
287 self._delete_stream_actions(cursor, stream_definition_id,
288 u'FIRE')
289 if expire_actions is not None:
290 self._delete_stream_actions(cursor, stream_definition_id,
291 u'EXPIRE')
292
293 self._insert_into_stream_actions(cursor, stream_definition_id,
294 fire_actions,
295 u"FIRE")
296 self._insert_into_stream_actions(cursor, stream_definition_id,
297 expire_actions,
298 u"EXPIRE")
299
300 # Get updated entry from mysql
301 cursor.execute(query, parms)
302
303 return cursor.fetchall()[0]
304
305 def _delete_stream_actions(self, cursor, stream_definition_id, action_type):
306
307 query = """
308 delete
309 from stream_actions
310 where stream_definition_id = %s and action_type = %s
311 """
312 parms = [stream_definition_id, action_type.encode('utf8')]
313 cursor.execute(query, parms)
314
209 def _insert_into_stream_actions(self, cursor, stream_definition_id, 315 def _insert_into_stream_actions(self, cursor, stream_definition_id,
210 actions, action_type): 316 actions, action_type):
211 317
diff --git a/monasca_events_api/common/repositories/streams_repository.py b/monasca_events_api/common/repositories/streams_repository.py
index 4bbc3ce..38da4f9 100644
--- a/monasca_events_api/common/repositories/streams_repository.py
+++ b/monasca_events_api/common/repositories/streams_repository.py
@@ -46,3 +46,17 @@ class StreamsRepository(object):
46 @abc.abstractmethod 46 @abc.abstractmethod
47 def get_stream_definitions(self, tenant_id, name, offset, limit): 47 def get_stream_definitions(self, tenant_id, name, offset, limit):
48 pass 48 pass
49
50 @abc.abstractmethod
51 def patch_stream_definition(self,
52 tenant_id,
53 stream_definition_id,
54 name,
55 description,
56 select,
57 group_by,
58 fire_criteria,
59 expiration,
60 fire_actions,
61 expire_actions):
62 pass
diff --git a/monasca_events_api/v2/stream_definitions.py b/monasca_events_api/v2/stream_definitions.py
index cde4549..f822629 100644
--- a/monasca_events_api/v2/stream_definitions.py
+++ b/monasca_events_api/v2/stream_definitions.py
@@ -119,6 +119,47 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
119 res.body = helpers.dumpit_utf8(result) 119 res.body = helpers.dumpit_utf8(result)
120 res.status = falcon.HTTP_200 120 res.status = falcon.HTTP_200
121 121
122 def on_patch(self, req, res, stream_id):
123 helpers.validate_authorization(req, self._default_authorized_roles)
124
125 stream_definition = helpers.read_json_msg_body(req)
126
127 tenant_id = helpers.get_tenant_id(req)
128
129 name = get_query_stream_definition_name(stream_definition, return_none=True)
130 description = get_query_stream_definition_description(
131 stream_definition, return_none=True)
132 select = get_query_stream_definition_select(stream_definition, return_none=True)
133 if select:
134 for s in select:
135 if 'traits' in s:
136 s['traits']['_tenant_id'] = tenant_id
137 else:
138 s['traits'] = {'_tenant_id': tenant_id}
139
140 group_by = get_query_stream_definition_group_by(stream_definition, return_none=True)
141 fire_criteria = get_query_stream_definition_fire_criteria(stream_definition, return_none=True)
142 expiration = get_query_stream_definition_expiration(stream_definition, return_none=True)
143 fire_actions = get_query_stream_definition_fire_actions(
144 stream_definition, return_none=True)
145 expire_actions = get_query_stream_definition_expire_actions(
146 stream_definition, return_none=True)
147
148 result = self._stream_definition_patch(tenant_id,
149 stream_id,
150 name,
151 description,
152 select,
153 group_by,
154 fire_criteria,
155 expiration,
156 fire_actions,
157 expire_actions)
158
159 helpers.add_links_to_resource(result, req.uri)
160 res.body = helpers.dumpit_utf8(result)
161 res.status = falcon.HTTP_201
162
122 def on_delete(self, req, res, stream_id): 163 def on_delete(self, req, res, stream_id):
123 helpers.validate_authorization(req, self._default_authorized_roles) 164 helpers.validate_authorization(req, self._default_authorized_roles)
124 tenant_id = helpers.get_tenant_id(req) 165 tenant_id = helpers.get_tenant_id(req)
@@ -196,6 +237,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
196 237
197 return result 238 return result
198 239
240 @resource.resource_try_catch_block
241 def _stream_definition_patch(self, tenant_id, stream_definition_id, name,
242 description, select, group_by,
243 fire_criteria, expiration,
244 fire_actions, expire_actions):
245
246 stream_definition_row = (
247 self._stream_definitions_repo.patch_stream_definition(tenant_id,
248 stream_definition_id,
249 name,
250 description,
251 None if select is None else json.dumps(select),
252 None if group_by is None else json.dumps(group_by),
253 None if fire_criteria is None else json.dumps(
254 fire_criteria),
255 expiration,
256 fire_actions,
257 expire_actions))
258
259 self._send_stream_definition_updated_event(tenant_id,
260 stream_definition_id,
261 name,
262 select,
263 group_by,
264 fire_criteria,
265 expiration)
266
267 result = self._build_stream_definition_show_result(stream_definition_row)
268
269 return result
270
199 def send_event(self, message_queue, event_msg): 271 def send_event(self, message_queue, event_msg):
200 try: 272 try:
201 message_queue.send_message( 273 message_queue.send_message(
@@ -300,9 +372,37 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
300 self.send_event(self.stream_definition_event_message_queue, 372 self.send_event(self.stream_definition_event_message_queue,
301 stream_definition_created_event_msg) 373 stream_definition_created_event_msg)
302 374
375 def _send_stream_definition_updated_event(self, tenant_id,
376 stream_definition_id,
377 name,
378 select,
379 group_by,
380 fire_criteria,
381 expiration):
382
383 stream_definition_created_event_msg = {
384 u'stream-definition-updated': {u'tenant_id': tenant_id,
385 u'stream_definition_id':
386 stream_definition_id,
387 u'name': name,
388 u'select': select,
389 u'group_by': group_by,
390 u'fire_criteria': fire_criteria,
391 u'expiration': expiration}
392 }
393
394 self.send_event(self.stream_definition_event_message_queue,
395 stream_definition_created_event_msg)
396
303 397
304def get_query_stream_definition_name(stream_definition): 398def get_query_stream_definition_name(stream_definition, return_none=False):
305 return (stream_definition['name']) 399 if 'name' in stream_definition:
400 return stream_definition['name']
401 else:
402 if return_none:
403 return None
404 else:
405 return ''
306 406
307 407
308def get_query_stream_definition_description(stream_definition, 408def get_query_stream_definition_description(stream_definition,
@@ -316,6 +416,50 @@ def get_query_stream_definition_description(stream_definition,
316 return '' 416 return ''
317 417
318 418
419def get_query_stream_definition_select(stream_definition,
420 return_none=False):
421 if 'select' in stream_definition:
422 return stream_definition['select']
423 else:
424 if return_none:
425 return None
426 else:
427 return ''
428
429
430def get_query_stream_definition_group_by(stream_definition,
431 return_none=False):
432 if 'group_by' in stream_definition:
433 return stream_definition['group_by']
434 else:
435 if return_none:
436 return None
437 else:
438 return []
439
440
441def get_query_stream_definition_fire_criteria(stream_definition,
442 return_none=False):
443 if 'fire_criteria' in stream_definition:
444 return stream_definition['fire_criteria']
445 else:
446 if return_none:
447 return None
448 else:
449 return ''
450
451
452def get_query_stream_definition_expiration(stream_definition,
453 return_none=False):
454 if 'expiration' in stream_definition:
455 return stream_definition['expiration']
456 else:
457 if return_none:
458 return None
459 else:
460 return ''
461
462
319def get_query_stream_definition_fire_actions(stream_definition, 463def get_query_stream_definition_fire_actions(stream_definition,
320 return_none=False): 464 return_none=False):
321 if 'fire_actions' in stream_definition: 465 if 'fire_actions' in stream_definition:
@@ -343,7 +487,7 @@ def get_query_stream_definition_actions_enabled(stream_definition,
343 return_none=False): 487 return_none=False):
344 try: 488 try:
345 if 'actions_enabled' in stream_definition: 489 if 'actions_enabled' in stream_definition:
346 return (stream_definition['actions_enabled']) 490 return stream_definition['actions_enabled']
347 else: 491 else:
348 if return_none: 492 if return_none:
349 return None 493 return None