diff --git a/bin/event_pump.py b/bin/event_pump.py index 64d16c9..daed6f5 100644 --- a/bin/event_pump.py +++ b/bin/event_pump.py @@ -24,9 +24,10 @@ queue = driver.create_queue(queue_name, exchange, queue_name, queue.declare() template_dir = sys.argv[1] +rate = int(sys.argv[2]) print "Using template dir:", template_dir -g = notigen.EventGenerator(template_dir, 1) +g = notigen.EventGenerator(template_dir, rate) now = datetime.datetime.utcnow() start = now end = now + datetime.timedelta(days=1) diff --git a/bin/templategen.py b/bin/templategen.py index 68851fa..2af4a89 100644 --- a/bin/templategen.py +++ b/bin/templategen.py @@ -32,7 +32,8 @@ class DateTimeEncoder(json.JSONEncoder): def get_all_events(cnx, date_range, next_range): # Get all the events (including EOD .exists) for a day - query = ("SELECT stacktach_rawdata.when AS d, stacktach_rawdata.event as event, " + query = ("SELECT stacktach_rawdata.when AS d, " + "stacktach_rawdata.event as event, " "stacktach_rawdata.json AS rawjson " "FROM stacktach_rawdata " "WHERE (stacktach_rawdata.when BETWEEN %f AND %f) " @@ -49,13 +50,15 @@ def get_all_events(cnx, date_range, next_range): aend = dateutil.parser.parse(payload['audit_period_ending']) # Ignore these EOD events, they're in tomorrow's batch. - if astart.time() == datetime.time.min and aend.time() == datetime.time.min: + if (astart.time() == datetime.time.min and + aend.time() == datetime.time.min): # print "EOD .exists at: %s (%s -> %s)" % (hwhen, astart, aend) continue payloads.append((hwhen, full)) cursor.close() - query = ("SELECT stacktach_rawdata.when AS d, stacktach_rawdata.json AS rawjson " + query = ("SELECT stacktach_rawdata.when AS d, " + "stacktach_rawdata.json AS rawjson " "FROM stacktach_rawdata " "WHERE (stacktach_rawdata.when BETWEEN %f AND %f) " "AND stacktach_rawdata.event='compute.instance.exists' " @@ -70,7 +73,8 @@ def get_all_events(cnx, date_range, next_range): aend = dateutil.parser.parse(payload['audit_period_ending']) # Ignore these EOD events, they're in tomorrow's batch. - if astart.time() != datetime.time.min or aend.time() != datetime.time.min: + if (astart.time() != datetime.time.min or + aend.time() != datetime.time.min): # print "Instant .exists at: %s (%s -> %s)" % (hwhen, astart, aend) continue payloads.append((hwhen, full)) @@ -99,7 +103,8 @@ newest = nu.dt_from_decimal(maxdate) print "Events from %s to %s" % (oldest, newest) start_of_full_day = oldest.replace(hour=0, minute=0, second=0, microsecond=0) -end_of_full_day = oldest.replace(hour=23, minute=59, second=59, microsecond=999999) +end_of_full_day = oldest.replace(hour=23, minute=59, second=59, + microsecond=999999) start_of_full_day = start_of_full_day + datetime.timedelta(days=1) end_of_full_day = end_of_full_day + datetime.timedelta(days=1) @@ -221,8 +226,8 @@ def _replace_dict_key(data, value, old_key): return _inner -uuid_regex = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", - re.IGNORECASE) +uuid_regex = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-" + r"[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE) xuuid_regex = re.compile(r"[0-9a-f]{32}", re.IGNORECASE) v4_regex = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}") v6_regex = re.compile("([0-9a-f]{1,4}:){7}[0-9a-f]{1,4}", re.IGNORECASE) @@ -304,10 +309,11 @@ def scrub(context, struct, parent): if m: source = struct original = struct[m.start():m.end()] - full_template = struct[:m.start()] + ptemplate + struct[m.end():] + full_template = struct[:m.start()] + ptemplate \ + + struct[m.end():] #print "Found %s:" % pkey, source - struct, index = placeholder(full_template, original, context, pkey, - ptemplate, parent) + struct, index = placeholder(full_template, original, context, + pkey, ptemplate, parent) if pkey == 'dt': start = context['_start_time'] time_map = context['_time_map'] @@ -321,7 +327,8 @@ for stream_type, streams_by_len in patterns.iteritems(): for when, uuid, event, rawjson in stream: # All datetimes will be relative to the first event's timestamp. if not context.get('_start_time'): - context['_start_time'] = dateutil.parser.parse(rawjson['timestamp']) + context['_start_time'] = dateutil.parser.parse( + rawjson['timestamp']) scrub(context, rawjson, None) output.append(rawjson) diff --git a/notigen/__init__.py b/notigen/__init__.py index 2ff7135..c2ac6c6 100644 --- a/notigen/__init__.py +++ b/notigen/__init__.py @@ -137,7 +137,9 @@ class EventGenerator(object): else: self.instances_in_use.add(uuid) - print "%s %4s-%s %s" % (when, uuid[-4:], operation[17:], event['event_type']) + print "%s %4s-%s %s" % (when, uuid[-4:], operation[17:], + event['event_type']) + del event['____context____'] ready.append(event) if (now - self.last_exists).days > 0: @@ -154,6 +156,7 @@ class EventGenerator(object): operation, context_hints, template) for when, event in sequence: payload = event['payload'] + del event['____context____'] payload['audit_period_beginning'] = audit_period_start payload['audit_period_ending'] = audit_period_start payload['instance_id'] = instance