summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-10-16 12:21:08 +0000
committerGerrit Code Review <review@openstack.org>2018-10-16 12:21:08 +0000
commita6ca0fead5419ee20ecac0f3bc1e9cb1af0c6884 (patch)
treec34358d4c4712941b4004c9421707482a04152bf
parent15c6877237cd195fbfadb82427efeb18285777fc (diff)
parent60b86fff4d1a79d6efef57bcb20c43ee53552680 (diff)
Merge "Use the tenant_id from context to name the index in ES"0.2.0
-rw-r--r--monasca_events_api/app/common/events_publisher.py101
-rw-r--r--monasca_events_api/app/controller/v1/bulk_processor.py39
-rw-r--r--monasca_events_api/app/controller/v1/events.py5
-rw-r--r--monasca_events_api/app/core/error_handlers.py4
-rw-r--r--monasca_events_api/app/core/model.py3
-rw-r--r--monasca_events_api/app/core/request.py3
-rw-r--r--monasca_events_api/app/model/envelope.py89
-rw-r--r--monasca_events_api/tests/unit/test_body_validation.py (renamed from monasca_events_api/tests/unit/test_body_valodiation.py)0
8 files changed, 148 insertions, 96 deletions
diff --git a/monasca_events_api/app/common/events_publisher.py b/monasca_events_api/app/common/events_publisher.py
index 7e49df8..7b90bc9 100644
--- a/monasca_events_api/app/common/events_publisher.py
+++ b/monasca_events_api/app/common/events_publisher.py
@@ -1,5 +1,4 @@
1# Copyright 2015 kornicameister@gmail.com 1# Copyright 2018 FUJITSU LIMITED
2# Copyright 2017 FUJITSU LIMITED
3# 2#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -16,10 +15,10 @@
16import falcon 15import falcon
17from monasca_common.kafka import producer 16from monasca_common.kafka import producer
18from monasca_common.kafka_lib.common import FailedPayloadsError 17from monasca_common.kafka_lib.common import FailedPayloadsError
19from monasca_common.rest import utils as rest_utils 18from monasca_events_api.app.model import envelope as ev_envelope
20from oslo_log import log
21
22from monasca_events_api import conf 19from monasca_events_api import conf
20from oslo_log import log
21from oslo_utils import encodeutils
23 22
24 23
25LOG = log.getLogger(__name__) 24LOG = log.getLogger(__name__)
@@ -30,10 +29,6 @@ _KAFKA_META_DATA_SIZE = 32
30_TRUNCATION_SAFE_OFFSET = 1 29_TRUNCATION_SAFE_OFFSET = 1
31 30
32 31
33class InvalidMessageException(Exception):
34 pass
35
36
37class EventPublisher(object): 32class EventPublisher(object):
38 """Publishes events data to Kafka 33 """Publishes events data to Kafka
39 34
@@ -63,88 +58,20 @@ class EventPublisher(object):
63 58
64 LOG.info('Initializing EventPublisher <%s>', self) 59 LOG.info('Initializing EventPublisher <%s>', self)
65 60
66 def send_message(self, messages): 61 def _transform_message(self, message):
67 """Sends message to each configured topic. 62 """Serialize and ensure that message has proper type
68
69 Note:
70 Empty content is not shipped to kafka
71
72 :param dict| list messages:
73 """
74 if not messages:
75 return
76 if not isinstance(messages, list):
77 messages = [messages]
78
79 sent_counter = 0
80 num_of_msgs = len(messages)
81
82 LOG.debug('About to publish %d messages to %s topics',
83 num_of_msgs, self._topics)
84
85 send_messages = []
86 63
87 for message in messages:
88 try:
89 msg = self._transform_message_to_json(message)
90 send_messages.append(msg)
91 except Exception as ex:
92 LOG.exception(
93 'Failed to transform message, '
94 'this massage is dropped {} '
95 'Exception: {}'.format(message, str(ex)))
96 try:
97 self._publish(send_messages)
98 sent_counter = len(send_messages)
99 except Exception as ex:
100 LOG.exception('Failure in publishing messages to kafka')
101 raise ex
102 finally:
103 self._check_if_all_messages_was_publish(sent_counter, num_of_msgs)
104
105 def _transform_message_to_json(self, message):
106 """Transforms message into JSON.
107
108 Method transforms message to JSON and
109 encode to utf8
110 :param str message: instance of message 64 :param str message: instance of message
111 :return: serialized message 65 :return: serialized message
112 :rtype: str 66 :rtype: str
113 """ 67 """
114 msg_json = rest_utils.as_json(message) 68 serialized = ev_envelope.serialize_envelope(message)
115 return msg_json.encode('utf-8') 69 return encodeutils.safe_encode(serialized, 'utf-8')
116
117 def _create_message_for_persister_from_request_body(self, body):
118 """Create message for persister from request body
119
120 Method take original request body and them
121 transform the request to proper message format
122 acceptable by event-prsister
123 :param body: original request body
124 :return: transformed message
125 """
126 timestamp = body['timestamp']
127 final_body = []
128 for events in body['events']:
129 ev = events['event'].copy()
130 ev.update({'timestamp': timestamp})
131 final_body.append(ev)
132 return final_body
133
134 def _ensure_type_bytes(self, message):
135 """Ensures that message will have proper type.
136
137 :param str message: instance of message
138
139 """
140
141 return message.encode('utf-8')
142 70
143 def _publish(self, messages): 71 def _publish(self, messages):
144 """Publishes messages to kafka. 72 """Publishes messages to kafka.
145 73
146 :param list messages: list of messages 74 :param list messages: list of messages
147
148 """ 75 """
149 num_of_msg = len(messages) 76 num_of_msg = len(messages)
150 77
@@ -178,6 +105,17 @@ class EventPublisher(object):
178 raise falcon.HTTPServiceUnavailable('Service unavailable', 105 raise falcon.HTTPServiceUnavailable('Service unavailable',
179 str(ex), 60) 106 str(ex), 60)
180 107
108 @staticmethod
109 def _is_message_valid(message):
110 """Validates message before sending.
111
112 Methods checks if message is :py:class:`model.envelope.Envelope`.
113 By being instance of this class it is ensured that all required
114 keys are found and they will have their values.
115
116 """
117 return isinstance(message, ev_envelope.Envelope)
118
181 def _check_if_all_messages_was_publish(self, send_count, to_send_count): 119 def _check_if_all_messages_was_publish(self, send_count, to_send_count):
182 """Executed after publishing to sent metrics. 120 """Executed after publishing to sent metrics.
183 121
@@ -185,7 +123,6 @@ class EventPublisher(object):
185 :param int to_send_count: how many messages should be sent 123 :param int to_send_count: how many messages should be sent
186 124
187 """ 125 """
188
189 failed_to_send = to_send_count - send_count 126 failed_to_send = to_send_count - send_count
190 127
191 if failed_to_send == 0: 128 if failed_to_send == 0:
diff --git a/monasca_events_api/app/controller/v1/bulk_processor.py b/monasca_events_api/app/controller/v1/bulk_processor.py
index 7bf13cf..da74da5 100644
--- a/monasca_events_api/app/controller/v1/bulk_processor.py
+++ b/monasca_events_api/app/controller/v1/bulk_processor.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -12,10 +12,12 @@
12# License for the specific language governing permissions and limitations 12# License for the specific language governing permissions and limitations
13# under the License. 13# under the License.
14 14
15from oslo_log import log 15from monasca_common.rest import utils as rest_utils
16
17from monasca_events_api.app.common import events_publisher 16from monasca_events_api.app.common import events_publisher
17from monasca_events_api.app.model import envelope
18from monasca_events_api import conf 18from monasca_events_api import conf
19from oslo_log import log
20from oslo_utils import encodeutils
19 21
20LOG = log.getLogger(__name__) 22LOG = log.getLogger(__name__)
21CONF = conf.CONF 23CONF = conf.CONF
@@ -30,11 +32,11 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
30 32
31 """ 33 """
32 34
33 def send_message(self, events): 35 def send_message(self, events, event_project_id=None):
34 """Sends bulk package to kafka 36 """Sends bulk package to kafka
35 37
36 :param list events: received events 38 :param list events: received events
37 39 :param str event_project_id: project id
38 """ 40 """
39 41
40 num_of_msgs = len(events) if events else 0 42 num_of_msgs = len(events) if events else 0
@@ -45,7 +47,7 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
45 47
46 for ev_el in events: 48 for ev_el in events:
47 try: 49 try:
48 t_el = self._transform_message_to_json(ev_el) 50 t_el = self._transform_message(ev_el, event_project_id)
49 if t_el: 51 if t_el:
50 to_send_msgs.append(t_el) 52 to_send_msgs.append(t_el)
51 except Exception as ex: 53 except Exception as ex:
@@ -62,3 +64,28 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
62 raise ex 64 raise ex
63 finally: 65 finally:
64 self._check_if_all_messages_was_publish(num_of_msgs, sent_count) 66 self._check_if_all_messages_was_publish(num_of_msgs, sent_count)
67
68 def _transform_message(self, event_element, event_project_id):
69 """Transform the message
70
71 :param dict event_element: original event element
72 :param str event_project_id: project id
73 :return: message payload
74 """
75 try:
76 msg_json = rest_utils.as_json(event_element)
77 msg_json = encodeutils.safe_encode(msg_json, 'utf-8')
78
79 event_envelope = envelope.Envelope.new_envelope(
80 event=msg_json,
81 project_id=event_project_id,
82 )
83
84 msg_payload = (super(EventsBulkProcessor, self)
85 ._transform_message(event_envelope))
86 return msg_payload
87
88 except Exception as ex:
89 LOG.error("Event transformation failed, rejecting event")
90 LOG.exception(ex)
91 return None
diff --git a/monasca_events_api/app/controller/v1/events.py b/monasca_events_api/app/controller/v1/events.py
index a938f51..5cf22c0 100644
--- a/monasca_events_api/app/controller/v1/events.py
+++ b/monasca_events_api/app/controller/v1/events.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -53,9 +53,10 @@ class Events(object):
53 try: 53 try:
54 request_body = helpers.read_json_msg_body(req) 54 request_body = helpers.read_json_msg_body(req)
55 req.can(policy_action) 55 req.can(policy_action)
56 project_id = req.project_id
56 body_validation.validate_body(request_body) 57 body_validation.validate_body(request_body)
57 messages = prepare_message_to_sent(request_body) 58 messages = prepare_message_to_sent(request_body)
58 self._processor.send_message(messages) 59 self._processor.send_message(messages, event_project_id=project_id)
59 res.status = falcon.HTTP_200 60 res.status = falcon.HTTP_200
60 except MultipleInvalid as ex: 61 except MultipleInvalid as ex:
61 LOG.error('Entire bulk package was rejected, unsupported body') 62 LOG.error('Entire bulk package was rejected, unsupported body')
diff --git a/monasca_events_api/app/core/error_handlers.py b/monasca_events_api/app/core/error_handlers.py
index 1e4868c..c9f2d11 100644
--- a/monasca_events_api/app/core/error_handlers.py
+++ b/monasca_events_api/app/core/error_handlers.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -25,5 +25,5 @@ def events_envelope_exception_handlet(ex, req, resp, params):
25 25
26 26
27def register_error_handler(app): 27def register_error_handler(app):
28 app.add_error_handler(envelope.EventsEnvelopeException, 28 app.add_error_handler(envelope.EventEnvelopeException,
29 events_envelope_exception_handlet) 29 events_envelope_exception_handlet)
diff --git a/monasca_events_api/app/core/model.py b/monasca_events_api/app/core/model.py
index bbb6304..dac04d3 100644
--- a/monasca_events_api/app/core/model.py
+++ b/monasca_events_api/app/core/model.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -24,5 +24,6 @@ def prepare_message_to_sent(body):
24 for events in body['events']: 24 for events in body['events']:
25 ev = events['event'].copy() 25 ev = events['event'].copy()
26 ev.update({'timestamp': timestamp}) 26 ev.update({'timestamp': timestamp})
27 ev.update({'dimensions': events.get('dimensions')})
27 final_body.append(ev) 28 final_body.append(ev)
28 return final_body 29 return final_body
diff --git a/monasca_events_api/app/core/request.py b/monasca_events_api/app/core/request.py
index a065678..ca7b2e7 100644
--- a/monasca_events_api/app/core/request.py
+++ b/monasca_events_api/app/core/request.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -36,6 +36,7 @@ class Request(falcon.Request):
36 self.context = \ 36 self.context = \
37 request_contex.RequestContext.from_environ(self.env) 37 request_contex.RequestContext.from_environ(self.env)
38 self.is_admin = policy.check_is_admin(self.context) 38 self.is_admin = policy.check_is_admin(self.context)
39 self.project_id = self.context.project_id
39 40
40 def can(self, action, target=None): 41 def can(self, action, target=None):
41 return self.context.can(action, target) 42 return self.context.can(action, target)
diff --git a/monasca_events_api/app/model/envelope.py b/monasca_events_api/app/model/envelope.py
index 9739c5d..84d320f 100644
--- a/monasca_events_api/app/model/envelope.py
+++ b/monasca_events_api/app/model/envelope.py
@@ -1,4 +1,4 @@
1# Copyright 2017 FUJITSU LIMITED 1# Copyright 2018 FUJITSU LIMITED
2# 2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may 3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain 4# not use this file except in compliance with the License. You may obtain
@@ -12,6 +12,91 @@
12# License for the specific language governing permissions and limitations 12# License for the specific language governing permissions and limitations
13# under the License. 13# under the License.
14 14
15from monasca_common.rest import utils as rest_utils
16from oslo_utils import encodeutils
17from oslo_utils import timeutils
15 18
16class EventsEnvelopeException(Exception): 19
20def serialize_envelope(envelope):
21 """Returns json representation of an envelope.
22
23 :return: json object of envelope
24 :rtype: json or a bytestring `encoding` encoded
25 representation of it.
26
27 """
28 json = rest_utils.as_json(envelope, ensure_ascii=False)
29 return encodeutils.safe_decode(json, 'utf-8')
30
31
32class EventEnvelopeException(Exception):
17 pass 33 pass
34
35
36class Envelope(dict):
37 def __init__(self, event, meta):
38 if not event:
39 error_msg = 'Envelope cannot be created without event'
40 raise EventEnvelopeException(error_msg)
41 if 'project_id' not in meta or not meta.get('project_id'):
42 error_msg = 'Envelope cannot be created without project_id'
43 raise EventEnvelopeException(error_msg)
44
45 creation_time = self._get_creation_time()
46 super(Envelope, self).__init__(
47 event=rest_utils.from_json(event),
48 creation_time=creation_time,
49 meta=meta
50 )
51
52 @staticmethod
53 def _get_creation_time():
54 return timeutils.utcnow_ts()
55
56 @classmethod
57 def new_envelope(cls, event, project_id):
58 """Creates new event envelope
59
60 Event envelope is combined of of following properties
61
62 * event - dict
63 * creation_time - timestamp
64 * meta - meta block
65
66 Example output json would be like this:
67
68 .. code-block:: json
69
70 {
71 "event": {
72 "message": "Some message",
73 "dimensions": {
74 "hostname": "devstack"
75 }
76 },
77 "creation_time": 1447834886,
78 "meta": {
79 "project_id": "e4bd29509eda473092d32aadfee3e7b1",
80 }
81 }
82
83 :param dict event: original event element
84 :param str project_id: project id to be put in meta field
85 """
86 event_meta = {
87 'project_id': project_id
88 }
89
90 return cls(event, event_meta)
91
92 @property
93 def event(self):
94 return self.get('event', None)
95
96 @property
97 def creation_time(self):
98 return self.get('creation_time', None)
99
100 @property
101 def meta(self):
102 return self.get('meta', None)
diff --git a/monasca_events_api/tests/unit/test_body_valodiation.py b/monasca_events_api/tests/unit/test_body_validation.py
index c07e5fb..c07e5fb 100644
--- a/monasca_events_api/tests/unit/test_body_valodiation.py
+++ b/monasca_events_api/tests/unit/test_body_validation.py