Add Elasticsearch driver

Change-Id: Idca6ff8a946ec46c65fab57f10cd17ebcdc8bc3e
This commit is contained in:
Alexander Ignatyev 2016-07-12 16:15:17 +03:00
parent 8027a1d3b7
commit bcea148068
7 changed files with 306 additions and 2 deletions

View File

@ -23,16 +23,20 @@ import sys
import argparse
from oslo_config import cfg
import osprofiler
from osprofiler.cmd import cliutils
from osprofiler.cmd import commands
from osprofiler import exc
from osprofiler import opts
class OSProfilerShell(object):
def __init__(self, argv):
args = self._get_base_parser().parse_args(argv)
opts.set_defaults(cfg.CONF)
if not (args.os_auth_token and args.ceilometer_url):
if not args.os_username:

View File

@ -1,4 +1,5 @@
from osprofiler.drivers import base # noqa
from osprofiler.drivers import ceilometer # noqa
from osprofiler.drivers import elasticsearch_driver # noqa
from osprofiler.drivers import messaging # noqa
from osprofiler.drivers import mongodb # noqa

View File

@ -0,0 +1,136 @@
# Copyright 2016 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six.moves.urllib.parse as parser
from oslo_config import cfg
from osprofiler.drivers import base
from osprofiler import exc
class ElasticsearchDriver(base.Driver):
def __init__(self, connection_str, index_name="osprofiler-notifications",
project=None, service=None, host=None, conf=cfg.CONF,
**kwargs):
"""Elasticsearch driver for OSProfiler."""
super(ElasticsearchDriver, self).__init__(connection_str,
project=project,
service=service, host=host)
try:
from elasticsearch import Elasticsearch
except ImportError:
raise exc.CommandError(
"To use this command, you should install "
"'elasticsearch' manually. Use command:\n "
"'pip install elasticsearch'.")
client_url = parser.urlunparse(parser.urlparse(self.connection_str)
._replace(scheme="http"))
self.conf = conf
self.client = Elasticsearch(client_url)
self.index_name = index_name
@classmethod
def get_name(cls):
return "elasticsearch"
def notify(self, info):
"""Send notifications to Elasticsearch.
:param info: Contains information about trace element.
In payload dict there are always 3 ids:
"base_id" - uuid that is common for all notifications
related to one trace. Used to simplify
retrieving of all trace elements from
Elasticsearch.
"parent_id" - uuid of parent element in trace
"trace_id" - uuid of current element in trace
With parent_id and trace_id it's quite simple to build
tree of trace elements, which simplify analyze of trace.
"""
info = info.copy()
info["project"] = self.project
info["service"] = self.service
self.client.index(index=self.index_name,
doc_type=self.conf.profiler.es_doc_type, body=info)
def _hits(self, response):
"""Returns all hits of search query using scrolling
:param response: ElasticSearch query response
"""
scroll_id = response["_scroll_id"]
scroll_size = len(response["hits"]["hits"])
result = []
while scroll_size > 0:
for hit in response["hits"]["hits"]:
result.append(hit["_source"])
response = self.client.scroll(scroll_id=scroll_id,
scroll=self.conf.profiler.
es_scroll_time)
scroll_id = response["_scroll_id"]
scroll_size = len(response["hits"]["hits"])
return result
def list_traces(self, query={"match_all": {}}, fields=[]):
"""Returns array of all base_id fields that match the given criteria
:param query: dict that specifies the query criteria
:param fields: iterable of strings that specifies the output fields
"""
for base_field in ["base_id", "timestamp"]:
if base_field not in fields:
fields.append(base_field)
response = self.client.search(index=self.index_name,
doc_type=self.conf.profiler.es_doc_type,
size=self.conf.profiler.es_scroll_size,
scroll=self.conf.profiler.es_scroll_time,
body={"_source": fields, "query": query,
"sort": [{"timestamp": "asc"}]})
return self._hits(response)
def get_report(self, base_id):
"""Retrieves and parses notification from Elasticsearch.
:param base_id: Base id of trace elements.
"""
response = self.client.search(index=self.index_name,
doc_type=self.conf.profiler.es_doc_type,
size=self.conf.profiler.es_scroll_size,
scroll=self.conf.profiler.es_scroll_time,
body={"query": {
"match": {"base_id": base_id}}})
for n in self._hits(response):
trace_id = n["trace_id"]
parent_id = n["parent_id"]
name = n["name"]
project = n["project"]
service = n["service"]
host = n["info"]["host"]
timestamp = n["timestamp"]
self._append_results(trace_id, parent_id, name, project, service,
host, timestamp, n)
return self._parse_results()

View File

@ -37,6 +37,7 @@ def init_from_conf(conf, context, project, service, host):
transport=oslo_messaging.get_transport(conf),
project=project,
service=service,
host=host)
host=host,
conf=conf)
notifier.set(_notifier)
web.enable(conf.profiler.hmac_keys)

View File

@ -88,6 +88,33 @@ sets the notifier to oslo_messaging.
Examples of possible values:
* messaging://: use oslo_messaging driver for sending notifications.
* mongodb://127.0.0.1:27017 : use mongodb driver for sending notifications.
* elasticsearch://127.0.0.1:9200 : use elasticsearch driver for sending
notifications.
""")
_es_doc_type_opt = cfg.StrOpt(
"es_doc_type",
default="notification",
help="""
Document type for notification indexing in elasticsearch.
""")
_es_scroll_time_opt = cfg.StrOpt(
"es_scroll_time",
default="2m",
help="""
This parameter is a time value parameter (for example: es_scroll_time=2m),
indicating for how long the nodes that participate in the search will maintain
relevant resources in order to continue and support it.
""")
_es_scroll_size_opt = cfg.IntOpt(
"es_scroll_size",
default=10000,
help="""
Elasticsearch splits large requests in batches. This parameter defines
maximum size of each batch (for example: es_scroll_size=10000).
""")
@ -96,11 +123,17 @@ _PROFILER_OPTS = [
_trace_sqlalchemy_opt,
_hmac_keys_opt,
_connection_string_opt,
_es_doc_type_opt,
_es_scroll_time_opt,
_es_scroll_size_opt
]
cfg.CONF.register_opts(_PROFILER_OPTS, group=_profiler_opt_group)
def set_defaults(conf, enabled=None, trace_sqlalchemy=None, hmac_keys=None,
connection_string=None):
connection_string=None, es_doc_type=None,
es_scroll_time=None, es_scroll_size=None):
conf.register_opts(_PROFILER_OPTS, group=_profiler_opt_group)
if enabled is not None:
@ -117,6 +150,18 @@ def set_defaults(conf, enabled=None, trace_sqlalchemy=None, hmac_keys=None,
conf.set_default("connection_string", connection_string,
group=_profiler_opt_group.name)
if es_doc_type is not None:
conf.set_default("es_doc_type", es_doc_type,
group=_profiler_opt_group.name)
if es_scroll_time is not None:
conf.set_default("es_scroll_time", es_scroll_time,
group=_profiler_opt_group.name)
if es_scroll_size is not None:
conf.set_default("es_scroll_size", es_scroll_size,
group=_profiler_opt_group.name)
def is_trace_enabled(conf=None):
if conf is None:

View File

@ -0,0 +1,114 @@
# Copyright 2016 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from osprofiler.drivers.elasticsearch_driver import ElasticsearchDriver
from osprofiler.tests import test
class ElasticsearchTestCase(test.TestCase):
def setUp(self):
super(ElasticsearchTestCase, self).setUp()
self.elasticsearch = ElasticsearchDriver("elasticsearch://localhost")
self.elasticsearch.project = "project"
self.elasticsearch.service = "service"
def test_init_and_notify(self):
self.elasticsearch.client = mock.MagicMock()
self.elasticsearch.client.reset_mock()
project = "project"
service = "service"
host = "host"
info = {
"a": 10,
"project": project,
"service": service,
"host": host
}
self.elasticsearch.notify(info)
self.elasticsearch.client\
.index.assert_called_once_with(index="osprofiler-notifications",
doc_type="notification",
body=info)
def test_get_empty_report(self):
self.elasticsearch.client = mock.MagicMock()
self.elasticsearch.client.search = mock\
.MagicMock(return_value={"_scroll_id": "1", "hits": {"hits": []}})
self.elasticsearch.client.reset_mock()
get_report = self.elasticsearch.get_report
base_id = "abacaba"
get_report(base_id)
self.elasticsearch.client\
.search.assert_called_once_with(index="osprofiler-notifications",
doc_type="notification",
size=10000,
scroll="2m",
body={"query": {
"match": {"base_id": base_id}}
})
def test_get_non_empty_report(self):
base_id = "1"
elasticsearch_first_response = {
"_scroll_id": "1",
"hits": {
"hits": [
{
"_source": {
"timestamp": "2016-08-10T16:58:03.064438",
"base_id": base_id,
"project": "project",
"service": "service",
"parent_id": "0",
"name": "test",
"info": {
"host": "host"
},
"trace_id": "1"
}
}
]}}
elasticsearch_second_response = {
"_scroll_id": base_id,
"hits": {"hits": []}}
self.elasticsearch.client = mock.MagicMock()
self.elasticsearch.client.search = \
mock.MagicMock(return_value=elasticsearch_first_response)
self.elasticsearch.client.scroll = \
mock.MagicMock(return_value=elasticsearch_second_response)
self.elasticsearch.client.reset_mock()
self.elasticsearch.get_report(base_id)
self.elasticsearch.client\
.search.assert_called_once_with(index="osprofiler-notifications",
doc_type="notification",
size=10000,
scroll="2m",
body={"query": {
"match": {"base_id": base_id}}
})
self.elasticsearch.client\
.scroll.assert_called_once_with(scroll_id=base_id, scroll="2m")

View File

@ -14,3 +14,6 @@ bandit>=0.17.3 # Apache-2.0
python-ceilometerclient>=2.2.1 # Apache-2.0
pymongo>=3.0.2,!=3.1 # Apache-2.0
# Elasticsearch python client
elasticsearch>=2.0.0,<=3.0.0 # Apache-2.0