slogging/slogging/log_common.py

302 lines
12 KiB
Python

# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cPickle
import cStringIO
from datetime import datetime
from datetime import timedelta
from eventlet import sleep
import multiprocessing
from paste.deploy import appconfig
import Queue
from slogging.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.memcached import MemcacheRing
from swift.common.utils import get_logger
import sys
import time
import traceback
import zlib
class BadFileDownload(Exception):
def __init__(self, status_code=None):
self.status_code = status_code
class WorkerError(Exception):
def __init__(self):
self.tb_str = '' # ensure that there is always something here
class LogProcessorCommon(object):
def __init__(self, conf, logger, log_route='log-processor'):
if isinstance(logger, tuple):
self.logger = get_logger(*logger, log_route=log_route)
else:
self.logger = logger
self.memcache = MemcacheRing([
s.strip() for s in conf.get('memcache_servers', '').split(',')
if s.strip()])
self.conf = conf
self._internal_proxy = None
self.lookback_hours = int(conf.get('lookback_hours', '120'))
self.lookback_window = int(conf.get('lookback_window',
self.lookback_hours))
self.log_processor_account = conf['swift_account']
self.log_processor_container = conf.get('container_name',
'simple_billing_data')
self.processed_files_object_name = \
conf.get('processed_files_object_name',
'processed_files.pickle.gz')
@property
def internal_proxy(self):
if self._internal_proxy is None:
# first look in the conf directly
proxy_server_conf_loc = self.conf.get('proxy_server_conf')
if proxy_server_conf_loc is None:
# then look in a section called log-processor
stats_conf = self.conf.get('log-processor', {})
proxy_server_conf_loc = stats_conf.get(
'proxy_server_conf', '/etc/swift/proxy-server.conf')
if proxy_server_conf_loc:
proxy_server_conf = appconfig(
'config:%s' % proxy_server_conf_loc,
name='proxy-server')
else:
proxy_server_conf = None
self._internal_proxy = InternalProxy(proxy_server_conf,
self.logger,
retries=3,
memcache=self.memcache)
return self._internal_proxy
def calculate_lookback(self):
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = timedelta(hours=self.lookback_hours)
lookback_start = datetime.now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = timedelta(hours=self.lookback_window)
lookback_end = datetime.now() - delta_hours + delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
return (lookback_start, lookback_end)
def load_already_processed_files(self):
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.get_object_data(
self.log_processor_account,
self.log_processor_container,
self.processed_files_object_name,
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
if buf:
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except BadFileDownload as err:
if err.status_code == 404:
already_processed_files = set()
else:
self.logger.error(_('Simple billing unable to load list '
'of already processed log files'))
return
self.logger.debug(_('found %d processed files') %
len(already_processed_files))
return already_processed_files
def save_processed_files(self, processed_files_data):
s = cPickle.dumps(processed_files_data, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
return self.internal_proxy.upload_file(
f,
self.log_processor_account,
self.log_processor_container,
self.processed_files_object_name)
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
self.logger.debug('get_object_data(%r, %r, %r, compressed=%r)' % (
swift_account, container_name, object_name, compressed))
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
raise BadFileDownload(code)
last_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
try:
for chunk in o:
if compressed:
try:
chunk = d.decompress(chunk)
except zlib.error:
self.logger.debug(_('Bad compressed data for %s') %
'/'.join((swift_account,
container_name,
object_name)))
raise BadFileDownload() # bad compressed data
parts = chunk.split('\n')
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
except ChunkReadTimeout:
raise BadFileDownload()
def get_container_listing(self, swift_account, container_name,
start_date=None, end_date=None,
listing_filter=None):
"""Get a container listing.
Data can be filtered by start_date, end_date, and
listing_filter.
Dates, if given, must be in YYYYMMDDHH format.
"""
search_key = None
if start_date is not None:
try:
parsed_date = time.strptime(start_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
hour = '%02d' % parsed_date.tm_hour
search_key = '/'.join([year, month, day, hour])
end_key = None
if end_date is not None:
try:
parsed_date = time.strptime(end_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
# Since the end_marker filters by <, we need to add something
# to make sure we get all the data under the last hour. Adding
# one to the hour should be all-inclusive.
hour = '%02d' % (parsed_date.tm_hour + 1)
end_key = '/'.join([year, month, day, hour])
container_listing = self.internal_proxy.get_container_list(
swift_account,
container_name,
marker=search_key,
end_marker=end_key)
results = []
if listing_filter is None:
listing_filter = set()
for item in container_listing:
name = item['name']
if name not in listing_filter:
results.append(name)
return results
def multiprocess_collate(processor_klass, processor_args, processor_method,
items_to_process, worker_count, logger=None):
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _junk in range(worker_count):
p = multiprocessing.Process(target=collate_worker,
args=(processor_klass,
processor_args,
processor_method,
in_queue,
out_queue,
logger))
p.start()
results.append(p)
for x in items_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None) # tell the worker to end
in_queue.close()
while True:
try:
item, data = out_queue.get_nowait()
except Queue.Empty:
sleep(.01)
except Exception:
if logger:
logger.exception('error reading from out queue')
else:
if isinstance(data, WorkerError):
if logger:
logger.error(data.tb_str)
else:
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_klass, processor_args, processor_method, in_queue,
out_queue, logger=None):
"""worker process for multiprocess_collate"""
try:
p = processor_klass(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
method = getattr(p, processor_method)
except AttributeError:
return
try:
ret = method(*item)
except Exception:
err_type, err, tb = sys.exc_info()
# Use err_type since unplickling err in the parent process
# will fail if it has a custom constructor with required
# parameters.
ret = WorkerError()
ret.tb_str = ''.join(traceback.format_tb(tb))
out_queue.put((item, ret))
except Exception:
if logger:
logger.exception('Error in worker')
finally:
in_queue.close()
out_queue.close()