Filter traces that contain error/exception
Adds ability to filter error traces to OSProfiler. Adds command for list error traces. Change-Id: I0ec97337cae5e573fedae2b6665aa73a73fe3654
This commit is contained in:
parent
41ad0ea221
commit
113a2ddddd
|
@ -155,11 +155,15 @@ class TraceCommands(BaseCommand):
|
|||
return dot
|
||||
|
||||
@cliutils.arg("--connection-string", dest="conn_str",
|
||||
default=(cliutils.env("OSPROFILER_CONNECTION_STRING")),
|
||||
default=cliutils.env("OSPROFILER_CONNECTION_STRING"),
|
||||
required=True,
|
||||
help="Storage driver's connection string. Defaults to "
|
||||
"env[OSPROFILER_CONNECTION_STRING] if set")
|
||||
@cliutils.arg("--error-trace", dest="error_trace",
|
||||
type=bool, default=False,
|
||||
help="List all traces that contain error.")
|
||||
def list(self, args):
|
||||
"""List all traces"""
|
||||
try:
|
||||
engine = base.get_driver(args.conn_str, **args.__dict__)
|
||||
except Exception as e:
|
||||
|
@ -168,7 +172,10 @@ class TraceCommands(BaseCommand):
|
|||
fields = ("base_id", "timestamp")
|
||||
pretty_table = prettytable.PrettyTable(fields)
|
||||
pretty_table.align = "l"
|
||||
traces = engine.list_traces(fields)
|
||||
if not args.error_trace:
|
||||
traces = engine.list_traces(fields)
|
||||
else:
|
||||
traces = engine.list_error_traces()
|
||||
for trace in traces:
|
||||
row = [trace[field] for field in fields]
|
||||
pretty_table.add_row(row)
|
||||
|
|
|
@ -55,7 +55,8 @@ class Driver(object):
|
|||
|
||||
default_trace_fields = {"base_id", "timestamp"}
|
||||
|
||||
def __init__(self, connection_str, project=None, service=None, host=None):
|
||||
def __init__(self, connection_str, project=None, service=None, host=None,
|
||||
**kwargs):
|
||||
self.connection_str = connection_str
|
||||
self.project = project
|
||||
self.service = service
|
||||
|
@ -66,6 +67,12 @@ class Driver(object):
|
|||
# Last trace started time
|
||||
self.last_started_at = None
|
||||
|
||||
profiler_config = kwargs.get("conf", {}).get("profiler", {})
|
||||
if hasattr(profiler_config, "filter_error_trace"):
|
||||
self.filter_error_trace = profiler_config.filter_error_trace
|
||||
else:
|
||||
self.filter_error_trace = False
|
||||
|
||||
def notify(self, info, **kwargs):
|
||||
"""This method will be called on each notifier.notify() call.
|
||||
|
||||
|
@ -115,6 +122,16 @@ class Driver(object):
|
|||
"or has to be overridden".format(
|
||||
self.get_name()))
|
||||
|
||||
def list_error_traces(self):
|
||||
"""Query all error traces from the storage.
|
||||
|
||||
:return List of traces, where each trace is a dictionary containing
|
||||
`base_id` and `timestamp`.
|
||||
"""
|
||||
raise NotImplementedError("{0}: This method is either not supported "
|
||||
"or has to be overridden".format(
|
||||
self.get_name()))
|
||||
|
||||
@staticmethod
|
||||
def _build_tree(nodes):
|
||||
"""Builds the tree (forest) data structure based on the list of nodes.
|
||||
|
|
|
@ -28,7 +28,10 @@ class ElasticsearchDriver(base.Driver):
|
|||
|
||||
super(ElasticsearchDriver, self).__init__(connection_str,
|
||||
project=project,
|
||||
service=service, host=host)
|
||||
service=service,
|
||||
host=host,
|
||||
conf=conf,
|
||||
**kwargs)
|
||||
try:
|
||||
from elasticsearch import Elasticsearch
|
||||
except ImportError:
|
||||
|
@ -42,6 +45,7 @@ class ElasticsearchDriver(base.Driver):
|
|||
self.conf = conf
|
||||
self.client = Elasticsearch(client_url)
|
||||
self.index_name = index_name
|
||||
self.index_name_error = "osprofiler-notifications-error"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
|
@ -70,6 +74,18 @@ class ElasticsearchDriver(base.Driver):
|
|||
self.client.index(index=self.index_name,
|
||||
doc_type=self.conf.profiler.es_doc_type, body=info)
|
||||
|
||||
if (self.filter_error_trace
|
||||
and info.get("info", {}).get("etype") is not None):
|
||||
self.notify_error_trace(info)
|
||||
|
||||
def notify_error_trace(self, info):
|
||||
"""Store base_id and timestamp of error trace to a separate index."""
|
||||
self.client.index(
|
||||
index=self.index_name_error,
|
||||
doc_type=self.conf.profiler.es_doc_type,
|
||||
body={"base_id": info["base_id"], "timestamp": info["timestamp"]}
|
||||
)
|
||||
|
||||
def _hits(self, response):
|
||||
"""Returns all hits of search query using scrolling
|
||||
|
||||
|
@ -91,10 +107,12 @@ class ElasticsearchDriver(base.Driver):
|
|||
return result
|
||||
|
||||
def list_traces(self, fields=None):
|
||||
"""Returns array of all base_id fields that match the given criteria
|
||||
"""Query all traces from the storage.
|
||||
|
||||
:param query: dict that specifies the query criteria
|
||||
:param fields: iterable of strings that specifies the output fields
|
||||
:param fields: Set of trace fields to return. Defaults to 'base_id'
|
||||
and 'timestamp'
|
||||
:return List of traces, where each trace is a dictionary containing
|
||||
at least `base_id` and `timestamp`.
|
||||
"""
|
||||
query = {"match_all": {}}
|
||||
fields = set(fields or self.default_trace_fields)
|
||||
|
@ -108,6 +126,22 @@ class ElasticsearchDriver(base.Driver):
|
|||
|
||||
return self._hits(response)
|
||||
|
||||
def list_error_traces(self):
|
||||
"""Returns all traces that have error/exception."""
|
||||
response = self.client.search(
|
||||
index=self.index_name_error,
|
||||
doc_type=self.conf.profiler.es_doc_type,
|
||||
size=self.conf.profiler.es_scroll_size,
|
||||
scroll=self.conf.profiler.es_scroll_time,
|
||||
body={
|
||||
"_source": self.default_trace_fields,
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"timestamp": "asc"}]
|
||||
}
|
||||
)
|
||||
|
||||
return self._hits(response)
|
||||
|
||||
def get_report(self, base_id):
|
||||
"""Retrieves and parses notification from Elasticsearch.
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ class MongoDB(base.Driver):
|
|||
"""MongoDB driver for OSProfiler."""
|
||||
|
||||
super(MongoDB, self).__init__(connection_str, project=project,
|
||||
service=service, host=host)
|
||||
service=service, host=host, **kwargs)
|
||||
try:
|
||||
from pymongo import MongoClient
|
||||
except ImportError:
|
||||
|
@ -60,6 +60,18 @@ class MongoDB(base.Driver):
|
|||
data["service"] = self.service
|
||||
self.db.profiler.insert_one(data)
|
||||
|
||||
if (self.filter_error_trace
|
||||
and data.get("info", {}).get("etype") is not None):
|
||||
self.notify_error_trace(data)
|
||||
|
||||
def notify_error_trace(self, data):
|
||||
"""Store base_id and timestamp of error trace to a separate db."""
|
||||
self.db.profiler_error.update(
|
||||
{"base_id": data["base_id"]},
|
||||
{"base_id": data["base_id"], "timestamp": data["timestamp"]},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
def list_traces(self, fields=None):
|
||||
"""Query all traces from the storage.
|
||||
|
||||
|
@ -75,6 +87,11 @@ class MongoDB(base.Driver):
|
|||
return [self.db.profiler.find(
|
||||
{"base_id": i}, out_format).sort("timestamp")[0] for i in ids]
|
||||
|
||||
def list_error_traces(self):
|
||||
"""Returns all traces that have error/exception."""
|
||||
out_format = {"base_id": 1, "timestamp": 1, "_id": 0}
|
||||
return self.db.profiler_error.find({}, out_format)
|
||||
|
||||
def get_report(self, base_id):
|
||||
"""Retrieves and parses notification from MongoDB.
|
||||
|
||||
|
|
|
@ -24,11 +24,12 @@ from osprofiler import exc
|
|||
|
||||
class Redis(base.Driver):
|
||||
def __init__(self, connection_str, db=0, project=None,
|
||||
service=None, host=None, **kwargs):
|
||||
service=None, host=None, conf=cfg.CONF, **kwargs):
|
||||
"""Redis driver for OSProfiler."""
|
||||
|
||||
super(Redis, self).__init__(connection_str, project=project,
|
||||
service=service, host=host)
|
||||
service=service, host=host,
|
||||
conf=conf, **kwargs)
|
||||
try:
|
||||
from redis import StrictRedis
|
||||
except ImportError:
|
||||
|
@ -42,6 +43,7 @@ class Redis(base.Driver):
|
|||
port=parsed_url.port,
|
||||
db=db)
|
||||
self.namespace = "osprofiler:"
|
||||
self.namespace_error = "osprofiler_error:"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
|
@ -70,6 +72,19 @@ class Redis(base.Driver):
|
|||
data["timestamp"]
|
||||
self.db.set(key, jsonutils.dumps(data))
|
||||
|
||||
if (self.filter_error_trace
|
||||
and data.get("info", {}).get("etype") is not None):
|
||||
self.notify_error_trace(data)
|
||||
|
||||
def notify_error_trace(self, data):
|
||||
"""Store base_id and timestamp of error trace to a separate key."""
|
||||
key = self.namespace_error + data["base_id"]
|
||||
value = jsonutils.dumps({
|
||||
"base_id": data["base_id"],
|
||||
"timestamp": data["timestamp"]
|
||||
})
|
||||
self.db.set(key, value)
|
||||
|
||||
def list_traces(self, fields=None):
|
||||
"""Query all traces from the storage.
|
||||
|
||||
|
@ -95,6 +110,20 @@ class Redis(base.Driver):
|
|||
if key in fields})
|
||||
return result
|
||||
|
||||
def list_error_traces(self):
|
||||
"""Returns all traces that have error/exception."""
|
||||
ids = self.db.scan_iter(match=self.namespace_error + "*")
|
||||
traces = [jsonutils.loads(self.db.get(i)) for i in ids]
|
||||
traces.sort(key=lambda x: x["timestamp"])
|
||||
seen_ids = set()
|
||||
result = []
|
||||
for trace in traces:
|
||||
if trace["base_id"] not in seen_ids:
|
||||
seen_ids.add(trace["base_id"])
|
||||
result.append(trace)
|
||||
|
||||
return result
|
||||
|
||||
def get_report(self, base_id):
|
||||
"""Retrieves and parses notification from Redis.
|
||||
|
||||
|
@ -123,7 +152,8 @@ class RedisSentinel(Redis, base.Driver):
|
|||
"""Redis driver for OSProfiler."""
|
||||
|
||||
super(RedisSentinel, self).__init__(connection_str, project=project,
|
||||
service=service, host=host)
|
||||
service=service, host=host,
|
||||
conf=conf, **kwargs)
|
||||
try:
|
||||
from redis.sentinel import Sentinel
|
||||
except ImportError:
|
||||
|
|
|
@ -17,7 +17,7 @@ from osprofiler import notifier
|
|||
from osprofiler import web
|
||||
|
||||
|
||||
def init_from_conf(conf, context, project, service, host):
|
||||
def init_from_conf(conf, context, project, service, host, **kwargs):
|
||||
"""Initialize notifier from service configuration
|
||||
|
||||
:param conf: service configuration
|
||||
|
@ -26,6 +26,7 @@ def init_from_conf(conf, context, project, service, host):
|
|||
:param service: service name that will be profiled
|
||||
:param host: hostname or host IP address that the service will be
|
||||
running on.
|
||||
:param kwargs: other arguments for notifier creation
|
||||
"""
|
||||
connection_str = conf.profiler.connection_string
|
||||
_notifier = notifier.create(
|
||||
|
@ -34,6 +35,7 @@ def init_from_conf(conf, context, project, service, host):
|
|||
project=project,
|
||||
service=service,
|
||||
host=host,
|
||||
conf=conf)
|
||||
conf=conf,
|
||||
**kwargs)
|
||||
notifier.set(_notifier)
|
||||
web.enable(conf.profiler.hmac_keys)
|
||||
|
|
|
@ -134,6 +134,18 @@ This parameter defines the name (for example:
|
|||
sentinal_service_name=mymaster).
|
||||
""")
|
||||
|
||||
_filter_error_trace = cfg.BoolOpt(
|
||||
"filter_error_trace",
|
||||
default=False,
|
||||
help="""
|
||||
Enable filter traces that contain error/exception to a separated place.
|
||||
Default value is set to False.
|
||||
|
||||
Possible values:
|
||||
|
||||
* True: Enable filter traces that contain error/exception.
|
||||
* False: Disable the filter.
|
||||
""")
|
||||
|
||||
_PROFILER_OPTS = [
|
||||
_enabled_opt,
|
||||
|
@ -144,7 +156,8 @@ _PROFILER_OPTS = [
|
|||
_es_scroll_time_opt,
|
||||
_es_scroll_size_opt,
|
||||
_socket_timeout_opt,
|
||||
_sentinel_service_name_opt
|
||||
_sentinel_service_name_opt,
|
||||
_filter_error_trace
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(_PROFILER_OPTS, group=_profiler_opt_group)
|
||||
|
|
Loading…
Reference in New Issue