Merge pull request #4 from hpcloud-mon/feature/batching
Feature/batching
This commit is contained in:
commit
5576423117
|
@ -108,3 +108,17 @@ class KafkaPublisher(publisher.Publisher):
|
|||
except Exception:
|
||||
LOG.exception('Unknown error.')
|
||||
raise exceptions.MessageQueueException()
|
||||
|
||||
def send_message_batch(self, messages):
|
||||
try:
|
||||
if not self._producer:
|
||||
self._init_producer()
|
||||
self._producer.send_messages(self.topic, *messages)
|
||||
except (common.KafkaUnavailableError,
|
||||
common.LeaderNotAvailableError):
|
||||
self._client = None
|
||||
LOG.exception('Error occurred while posting data to Kafka.')
|
||||
raise exceptions.MessageQueueException()
|
||||
except Exception:
|
||||
LOG.exception('Unknown error.')
|
||||
raise exceptions.MessageQueueException()
|
||||
|
|
|
@ -12,17 +12,24 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
def transform(event, tenant_id, region):
|
||||
transformed_event = dict(
|
||||
event=event,
|
||||
meta=dict(
|
||||
tenantId=tenant_id,
|
||||
region=region
|
||||
),
|
||||
creation_time=timeutils.utcnow_ts()
|
||||
)
|
||||
def transform(events, tenant_id, region):
|
||||
event_template = {'event': {},
|
||||
'_tenant_id': tenant_id,
|
||||
'meta': {'tenantId': tenant_id, 'region': region},
|
||||
'creation_time': timeutils.utcnow_ts()}
|
||||
|
||||
return transformed_event
|
||||
if isinstance(events, list):
|
||||
transformed_events = []
|
||||
for event in events:
|
||||
event_template['event'] = event
|
||||
transformed_events.append(json.dumps(event_template))
|
||||
return transformed_events
|
||||
else:
|
||||
transformed_event = event_template['event']
|
||||
transformed_event['event'] = events
|
||||
return [json.dumps(transformed_event)]
|
||||
|
|
|
@ -33,8 +33,11 @@ event_schema = {
|
|||
voluptuous.Length(max=50)),
|
||||
voluptuous.Required('timestamp'): DateValidator()}
|
||||
|
||||
request_body_schema = voluptuous.Schema(event_schema,
|
||||
required=True, extra=True)
|
||||
event_schema = voluptuous.Schema(event_schema,
|
||||
required=True, extra=True)
|
||||
|
||||
request_body_schema = voluptuous.Schema(
|
||||
voluptuous.Any(event_schema, [event_schema]))
|
||||
|
||||
|
||||
def validate(body):
|
||||
|
|
|
@ -12,8 +12,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
import falcon
|
||||
|
||||
import collections
|
||||
|
@ -32,7 +30,6 @@ from monasca_events_api.v2.common.schemas import (
|
|||
events_request_body_schema as schemas_event)
|
||||
from monasca_events_api.v2.common.schemas import (
|
||||
exceptions as schemas_exceptions)
|
||||
from monasca_events_api.v2.common import utils
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
|
@ -83,9 +80,9 @@ class Events(events_api_v2.EventsV2API):
|
|||
helpers.validate_json_content_type(req)
|
||||
helpers.validate_authorization(req, self._post_events_authorized_roles)
|
||||
event = helpers.read_http_resource(req)
|
||||
|
||||
self._validate_event(event)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
event['_tenant_id'] = tenant_id
|
||||
transformed_event = self._event_transform(event, tenant_id,
|
||||
self._region)
|
||||
self._send_event(transformed_event)
|
||||
|
@ -106,16 +103,14 @@ class Events(events_api_v2.EventsV2API):
|
|||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
def _send_event(self, event):
|
||||
def _send_event(self, events):
|
||||
"""Send the event using the message queue.
|
||||
|
||||
:param metrics: An event object.
|
||||
:param metrics: A series of event objects.
|
||||
:raises: falcon.HTTPServiceUnavailable
|
||||
"""
|
||||
try:
|
||||
str_msg = json.dumps(event, default=utils.date_handler,
|
||||
ensure_ascii=False).encode('utf8')
|
||||
self._message_queue.send_message(str_msg)
|
||||
self._message_queue.send_message_batch(events)
|
||||
except message_queue_exceptions.MessageQueueException as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable',
|
||||
|
|
Loading…
Reference in New Issue