removed temporary ____context____ key and fixed pump

This commit is contained in:
Sandy Walsh 2014-08-14 20:51:40 +00:00
parent fa5aced98b
commit 8c53a567b5
3 changed files with 24 additions and 13 deletions

View File

@ -24,9 +24,10 @@ queue = driver.create_queue(queue_name, exchange, queue_name,
queue.declare()
template_dir = sys.argv[1]
rate = int(sys.argv[2])
print "Using template dir:", template_dir
g = notigen.EventGenerator(template_dir, 1)
g = notigen.EventGenerator(template_dir, rate)
now = datetime.datetime.utcnow()
start = now
end = now + datetime.timedelta(days=1)

View File

@ -32,7 +32,8 @@ class DateTimeEncoder(json.JSONEncoder):
def get_all_events(cnx, date_range, next_range):
# Get all the events (including EOD .exists) for a day
query = ("SELECT stacktach_rawdata.when AS d, stacktach_rawdata.event as event, "
query = ("SELECT stacktach_rawdata.when AS d, "
"stacktach_rawdata.event as event, "
"stacktach_rawdata.json AS rawjson "
"FROM stacktach_rawdata "
"WHERE (stacktach_rawdata.when BETWEEN %f AND %f) "
@ -49,13 +50,15 @@ def get_all_events(cnx, date_range, next_range):
aend = dateutil.parser.parse(payload['audit_period_ending'])
# Ignore these EOD events, they're in tomorrow's batch.
if astart.time() == datetime.time.min and aend.time() == datetime.time.min:
if (astart.time() == datetime.time.min and
aend.time() == datetime.time.min):
# print "EOD .exists at: %s (%s -> %s)" % (hwhen, astart, aend)
continue
payloads.append((hwhen, full))
cursor.close()
query = ("SELECT stacktach_rawdata.when AS d, stacktach_rawdata.json AS rawjson "
query = ("SELECT stacktach_rawdata.when AS d, "
"stacktach_rawdata.json AS rawjson "
"FROM stacktach_rawdata "
"WHERE (stacktach_rawdata.when BETWEEN %f AND %f) "
"AND stacktach_rawdata.event='compute.instance.exists' "
@ -70,7 +73,8 @@ def get_all_events(cnx, date_range, next_range):
aend = dateutil.parser.parse(payload['audit_period_ending'])
# Ignore these EOD events, they're in tomorrow's batch.
if astart.time() != datetime.time.min or aend.time() != datetime.time.min:
if (astart.time() != datetime.time.min or
aend.time() != datetime.time.min):
# print "Instant .exists at: %s (%s -> %s)" % (hwhen, astart, aend)
continue
payloads.append((hwhen, full))
@ -99,7 +103,8 @@ newest = nu.dt_from_decimal(maxdate)
print "Events from %s to %s" % (oldest, newest)
start_of_full_day = oldest.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_full_day = oldest.replace(hour=23, minute=59, second=59, microsecond=999999)
end_of_full_day = oldest.replace(hour=23, minute=59, second=59,
microsecond=999999)
start_of_full_day = start_of_full_day + datetime.timedelta(days=1)
end_of_full_day = end_of_full_day + datetime.timedelta(days=1)
@ -221,8 +226,8 @@ def _replace_dict_key(data, value, old_key):
return _inner
uuid_regex = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
re.IGNORECASE)
uuid_regex = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
r"[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE)
xuuid_regex = re.compile(r"[0-9a-f]{32}", re.IGNORECASE)
v4_regex = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
v6_regex = re.compile("([0-9a-f]{1,4}:){7}[0-9a-f]{1,4}", re.IGNORECASE)
@ -304,10 +309,11 @@ def scrub(context, struct, parent):
if m:
source = struct
original = struct[m.start():m.end()]
full_template = struct[:m.start()] + ptemplate + struct[m.end():]
full_template = struct[:m.start()] + ptemplate \
+ struct[m.end():]
#print "Found %s:" % pkey, source
struct, index = placeholder(full_template, original, context, pkey,
ptemplate, parent)
struct, index = placeholder(full_template, original, context,
pkey, ptemplate, parent)
if pkey == 'dt':
start = context['_start_time']
time_map = context['_time_map']
@ -321,7 +327,8 @@ for stream_type, streams_by_len in patterns.iteritems():
for when, uuid, event, rawjson in stream:
# All datetimes will be relative to the first event's timestamp.
if not context.get('_start_time'):
context['_start_time'] = dateutil.parser.parse(rawjson['timestamp'])
context['_start_time'] = dateutil.parser.parse(
rawjson['timestamp'])
scrub(context, rawjson, None)
output.append(rawjson)

View File

@ -137,7 +137,9 @@ class EventGenerator(object):
else:
self.instances_in_use.add(uuid)
print "%s %4s-%s %s" % (when, uuid[-4:], operation[17:], event['event_type'])
print "%s %4s-%s %s" % (when, uuid[-4:], operation[17:],
event['event_type'])
del event['____context____']
ready.append(event)
if (now - self.last_exists).days > 0:
@ -154,6 +156,7 @@ class EventGenerator(object):
operation, context_hints, template)
for when, event in sequence:
payload = event['payload']
del event['____context____']
payload['audit_period_beginning'] = audit_period_start
payload['audit_period_ending'] = audit_period_start
payload['instance_id'] = instance