# Copyright Samsung Electronics 2013. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Elastic search wrapper to make handling results easier.""" import calendar import copy import datetime import pprint import dateutil.parser as dp import pyelasticsearch import pytz pp = pprint.PrettyPrinter() class SearchEngine(object): """Wrapper for pyelasticsearch so that it returns result sets.""" def __init__(self, url, indexfmt='logstash-%Y.%m.%d'): self._url = url self._indexfmt = indexfmt self.index_cache = {} def _is_valid_index(self, es, index): if index in self.index_cache: return self.index_cache[index] try: es.status(index=index) self.index_cache[index] = True return True except pyelasticsearch.exceptions.ElasticHttpNotFoundError: return False def search(self, query, size=1000, recent=False, days=0): """Search an elasticsearch server. `query` parameter is the complicated query structure that pyelasticsearch uses. More details in their documentation. `size` is the max number of results to return from the search engine. We default it to 1000 to ensure we don't loose things. For certain classes of queries (like faceted ones), this can actually be set very low, as it won't impact the facet counts. `recent` search only most recent indexe(s), assuming this is basically a real time query that you only care about the last hour of time. Using recent dramatically reduces the load on the ES cluster. `days` search only the last number of days. The returned result is a ResultSet query. """ es = pyelasticsearch.ElasticSearch(self._url) args = {'size': size} if recent or days: # today's index datefmt = self._indexfmt now = datetime.datetime.utcnow() indexes = [] latest_index = now.strftime(datefmt) if self._is_valid_index(es, latest_index): indexes.append(latest_index) if recent: lasthr = now - datetime.timedelta(hours=1) lasthr_index = lasthr.strftime(datefmt) if lasthr_index != latest_index: if self._is_valid_index(es, lasthr_index): indexes.append(lasthr.strftime(datefmt)) for day in range(1, days): lastday = now - datetime.timedelta(days=day) index_name = lastday.strftime(datefmt) if self._is_valid_index(es, index_name): indexes.append(index_name) args['index'] = indexes results = es.search(query, **args) return ResultSet(results) class ResultSet(list): """An easy iterator object for handling elasticsearch results. pyelasticsearch returns very complex result structures, and manipulating them directly is both ugly and error prone. The point of this wrapper class is to give us a container that makes working with pyes results more natural. For instance: :: results = se.search(...) for hit in results: print hit.build_status This greatly simplifies code that is interacting with search results, and allows us to handle some schema instability with elasticsearch, through adapting our __getattr__ methods. Design goals for ResultSet are that it is an iterator, and that all the data that we want to work with is mapped to a flat attribute namespace (pyes goes way overboard with nesting, which is fine in the general case, but in the elastic_recheck case is just added complexity). """ def __init__(self, results=None): self._results = results or {} if 'hits' in self._results: self._parse_hits(self._results['hits']) def _parse_hits(self, hits): # why, oh why elastic search hits = hits['hits'] for hit in hits: list.append(self, Hit(hit)) def __getattr__(self, attr): """Magic __getattr__, flattens the attributes namespace. First search to see if a facet attribute exists by this name, secondly look at the top level attributes to return. """ if 'facets' in self._results: if attr in self._results['facets']['tag']: return self._results['facets']['tag'][attr] if attr in self._results: return self._results[attr] raise AttributeError(attr) class FacetSet(dict): """A dictionary like collection for creating faceted ResultSets. Elastic Search doesn't support nested facets, which are incredibly useful for things like faceting by build_status then by build_uuid. This is a client side implementation that processes a ResultSet with an ordered list of facets, and turns it into a data structure which is FacetSet -> FacetSet ... -> ResultSet (arbitrary nesting of FaceSets with ResultSet as the leaves. Treat this basically like a dictionary (which it inherits from). """ @staticmethod def _histogram(data, facet, res=3600): """A preprocessor for data should we want to bucket it.""" # NOTE(mriedem): We sometimes hit a case where the @timestamp attribute # is too large and ES won't return it. At some point we should probably # log a warning/error for these so we can clean them up. if facet == "timestamp" and data is not None: ts = dp.parse(data) tsepoch = int(calendar.timegm(ts.timetuple())) # take the floor based on resolution ts -= datetime.timedelta( seconds=(tsepoch % res), microseconds=ts.microsecond) # ms since epoch epoch = datetime.datetime.fromtimestamp(0, pytz.UTC) pos = int(((ts - epoch).total_seconds()) * 1000) return pos return data def detect_facets(self, results, facets, res=3600): if len(facets) > 0: facet = facets.pop(0) for hit in results: attr = self._histogram(hit[facet], facet) if attr not in self: dict.setdefault(self, attr, ResultSet()) self[attr].append(hit) else: self[attr].append(hit) # if we still have more facets to go, recurse down if len(facets) > 0: newkeys = {} for key in self: fs = FacetSet() fs.detect_facets(self[key], copy.deepcopy(facets), res=res) newkeys[key] = fs self.update(newkeys) class Hit(object): def __init__(self, hit): self._hit = hit def index(self): return self._hit['_index'] def __getitem__(self, key): return self.__getattr__(key) def __getattr__(self, attr): """flatten out our attr space into a few key types new style ES has _source[attr] for a flat space old style ES has _source['@attr'] for things like message, @timestamp and _source['@fields'][attr] for things like build_name, build_status also, always collapse down all attributes to singletons, because they might be lists if we use multiline processing (which we do a lot). In the general case this could be a problem, but the way we use logstash, there is only ever one element in these lists. """ def first(item): if isinstance(item, list): # We've seen cases where the field data, like @timestamp, is # too large so we don't get anything back from elastic-search, # so skip over those. if len(item) > 0: return item[0] return None return item result = None at_attr = "@%s" % attr if attr in self._hit['_source']: result = first(self._hit['_source'][attr]) elif at_attr in self._hit['_source']: result = first(self._hit['_source'][at_attr]) elif attr in self._hit['_source']['@fields']: result = first(self._hit['_source']['@fields'][attr]) return result def __repr__(self): return pp.pformat(self._hit)