diff --git a/etc/log-processor.conf-sample b/etc/log-processor.conf-sample index 9d2d1fb..010b5ee 100644 --- a/etc/log-processor.conf-sample +++ b/etc/log-processor.conf-sample @@ -10,6 +10,7 @@ swift_account = AUTH_test # lookback_window = 120 # user = swift # format_type = csv +# time_zone = UTC [log-processor-access] # log_dir = /var/log/swift/ @@ -35,6 +36,7 @@ class_path = slogging.access_processor.AccessLogProcessor # list of swift.sources (see swift/proxy/server.py posthooklogger) # that count as service traffic # content_type = +# time_zone = UTC [log-processor-stats] # log_dir = /var/log/swift/ @@ -47,6 +49,7 @@ class_path = slogging.stats_processor.StatsLogProcessor # mount_check = true # user = swift # content_type = +# time_zone = UTC [log-processor-container-stats] # log_dir = /var/log/swift/ @@ -61,3 +64,4 @@ processable = false # user = swift # metadata_keys = comma separated list of user metadata keys to be collected # content_type = +# time_zone = UTC diff --git a/slogging/access_processor.py b/slogging/access_processor.py index 2bf4f91..27ec069 100644 --- a/slogging/access_processor.py +++ b/slogging/access_processor.py @@ -16,6 +16,10 @@ import collections from urllib import unquote import copy +from tzlocal import get_localzone +from datetime import datetime +from slogging import common +import pytz # conditionalize the return_ips method based on whether or not iptools # is present in the system. Without iptools, you will lack CIDR support. @@ -43,6 +47,7 @@ from swift.common.utils import split_path, get_logger month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() LISTING_PARAMS = set( 'path limit format delimiter marker end_marker prefix'.split()) +local_zone = get_localzone() class AccessLogProcessor(object): @@ -54,6 +59,8 @@ class AccessLogProcessor(object): setattr(self, conf_tag, return_ips(conf, conf_tag)) self.warn_percent = float(conf.get('warn_percent', '0.8')) self.logger = get_logger(conf, log_route='access-processor') + self.time_zone = common.get_time_zone(conf, self.logger, 'time_zone', + str(local_zone)) def log_line_parser(self, raw_log): '''given a raw access log line, return a dict of the good parts''' @@ -186,7 +193,17 @@ class AccessLogProcessor(object): elif object_name: op_level = 'object' - aggr_key = (account, year, month, day, hour) + utc_line_date = datetime(int(year), int(month), int(day), + int(hour), int(minute), int(second), + tzinfo=pytz.utc) + line_date = utc_line_date.astimezone(self.time_zone) + line_date_year = line_date.strftime('%Y') + line_date_month = line_date.strftime('%m') + line_date_day = line_date.strftime('%d') + line_date_hour = line_date.strftime('%H') + + aggr_key = (account, line_date_year, line_date_month, + line_date_day, line_date_hour) d = hourly_aggr_info.get(aggr_key, {}) if CIDR_support: sanitize_ips(line_data) diff --git a/slogging/common.py b/slogging/common.py index bf21e20..81a9f08 100644 --- a/slogging/common.py +++ b/slogging/common.py @@ -13,6 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytz + + +def get_time_zone(conf, logger, key, default): + """ + Get and check time_zone value. + """ + str_time_zone = conf.get(key, default) + try: + time_zone = pytz.timezone(str_time_zone) + except pytz.exceptions.UnknownTimeZoneError: + logger.warning( + _("Invalid Parameter %s: %s, " % (key, str_time_zone) + + "use default %s.") % default) + time_zone = pytz.timezone(default) + return time_zone + def get_format_type(conf, logger, key, default): """ diff --git a/slogging/db_stats_collector.py b/slogging/db_stats_collector.py index fb43ad3..ab96e89 100644 --- a/slogging/db_stats_collector.py +++ b/slogging/db_stats_collector.py @@ -15,6 +15,7 @@ import os import time +from datetime import datetime from paste.deploy import appconfig import shutil import hashlib @@ -29,6 +30,10 @@ from swift.common.utils import renamer, get_logger, readconf, mkdirs, \ TRUE_VALUES, remove_file from swift.common.constraints import check_mount from swift.common.daemon import Daemon +from slogging import common +from tzlocal import get_localzone + +local_zone = get_localzone() class DatabaseStatsCollector(Daemon): @@ -51,6 +56,8 @@ class DatabaseStatsCollector(Daemon): mkdirs(self.target_dir) self.logger = get_logger(stats_conf, log_route='%s-stats' % stats_type) + self.time_zone = common.get_time_zone(stats_conf, self.logger, + 'time_zone', str(local_zone)) def run_once(self, *args, **kwargs): self.logger.info(_("Gathering %s stats" % self.stats_type)) @@ -66,7 +73,8 @@ class DatabaseStatsCollector(Daemon): raise NotImplementedError('Subclasses must override') def find_and_process(self): - src_filename = time.strftime(self.filename_format) + src_filename = datetime.now(self.time_zone).strftime( + self.filename_format) working_dir = os.path.join(self.target_dir, '.%-stats_tmp' % self.stats_type) shutil.rmtree(working_dir, ignore_errors=True) diff --git a/slogging/log_processor.py b/slogging/log_processor.py index 9a31193..bc7c9c4 100644 --- a/slogging/log_processor.py +++ b/slogging/log_processor.py @@ -24,6 +24,7 @@ import multiprocessing import Queue import cPickle import hashlib +from tzlocal import get_localzone import json import io @@ -35,6 +36,7 @@ from slogging.log_common import LogProcessorCommon, multiprocess_collate, \ from slogging import common now = datetime.datetime.now +local_zone = get_localzone() class LogProcessor(LogProcessorCommon): @@ -139,6 +141,8 @@ class LogProcessorDaemon(Daemon): self.worker_count = int(c.get('worker_count', '1')) self._keylist_mapping = None self.processed_files_filename = 'processed_files.pickle.gz' + self.time_zone = common.get_time_zone(c, self.logger, 'time_zone', + str(local_zone)) self.format_type = common.get_format_type(c, self.logger, 'format_type', 'csv') @@ -159,13 +163,13 @@ class LogProcessorDaemon(Daemon): lookback_end = None else: delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = now() - delta_hours + lookback_start = now(self.time_zone) - delta_hours lookback_start = lookback_start.strftime('%Y%m%d%H') if self.lookback_window == 0: lookback_end = None else: delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = now() - \ + lookback_end = now(self.time_zone) - \ delta_hours + \ delta_window lookback_end = lookback_end.strftime('%Y%m%d%H') @@ -318,13 +322,15 @@ class LogProcessorDaemon(Daemon): all_account_stats = collections.defaultdict(dict) for (account, year, month, day, hour), d in final_info.items(): data_ts = datetime.datetime(int(year), int(month), - int(day), int(hour)) - time_stamp = data_ts.strftime('%Y/%m/%d %H:00:00') + int(day), int(hour), + tzinfo=self.time_zone) + time_stamp = data_ts.strftime('%Y/%m/%d %H:00:00 %z') hourly_account_stats = \ self.restructure_stats_dictionary(d) all_account_stats[account].update({time_stamp: hourly_account_stats}) - output = {'stats_data': all_account_stats} + output = {'time_zone': str(self.time_zone), + 'stats_data': all_account_stats} else: # csv sorted_keylist_mapping = sorted(self.keylist_mapping) columns = ['data_ts', 'account'] + sorted_keylist_mapping @@ -375,12 +381,14 @@ class LogProcessorDaemon(Daemon): if self.format_type == 'json': out_buf = json.dumps(output, indent=2) h = hashlib.md5(out_buf).hexdigest() - upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.json.gz' % h + upload_name = datetime.datetime.now(self.time_zone).strftime( + '%Y/%m/%d/%H/') + '%s.json.gz' % h f = io.BytesIO(out_buf) else: out_buf = '\n'.join([','.join(row) for row in output]) h = hashlib.md5(out_buf).hexdigest() - upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h + upload_name = datetime.datetime.now(self.time_zone).strftime( + '%Y/%m/%d/%H/') + '%s.csv.gz' % h f = cStringIO.StringIO(out_buf) self.log_processor.internal_proxy.upload_file(f, self.log_processor_account,