removed the old generator. Template only now.

Also changed the operations_per_minute to operations_per_hour
so we can get .exists with fewer events being generated.
This commit is contained in:
Sandy Walsh 2014-08-13 20:29:21 -07:00
parent 65ebc00aae
commit b208de357c
1 changed files with 80 additions and 303 deletions

View File

@ -15,7 +15,8 @@
"""OpenStack-like Notification Generation Library
Built from work done in https://github.com/SandyWalsh/twobillion
Originally built from work done in
https://github.com/SandyWalsh/twobillion
"""
import datetime
@ -30,57 +31,11 @@ import sys
import time
import uuid
COMPUTE_EVENTS = [
'compute.instance.finish_resize.*',
'compute.instance.power_off.*',
'compute.instance.power_on.*',
'compute.instance.reboot.*',
'compute.instance.rebuild.*',
'compute.instance.resize.confirm.*',
'compute.instance.resize.prep.*',
'compute.instance.resize.revert.*',
'compute.instance.resize.*',
'compute.instance.shutdown.*',
'compute.instance.snapshot.*',
'compute.instance.suspend',
'compute.instance.resume',
'compute.instance.update',
'attach_volume',
'change_instance_metadata',
'detach_volume',
'finish_resize',
'finish_revert_resize',
'get_vnc_console',
'power_on_instance',
'prep_resize',
'reboot_instance',
'rebuild_instance',
'rescue_instance',
'reserve_block_device_name',
'resize_instance',
'revert_resize',
'run_instance',
'set_admin_password',
'snapshot_instance',
'start_instance',
'suspend_instance',
'terminate_instance',
'unrescue_instance']
SCHEDULER_EVENTS = ['scheduler.run_instance.start',
'scheduler.run_instance.scheduled',
'scheduler.run_instance.end']
SCHEDULERS = ['scheduler_%02d' % x for x in xrange(3)]
COMPUTE_NODES = ['compute_%03d' % x for x in xrange(100)]
API_NODES = ['api.server.%02d' % x for x in xrange(10)]
import dateutil.parser
class EventGenerator(object):
def __init__(self, operations_per_minute=1000, exists_hours=1):
def __init__(self, template_dir, operations_per_hour=1, exists_hours=24):
self.exists_hours = exists_hours # num hours between .exists
self.instances = {} # { uuid: compute_node }
@ -99,7 +54,7 @@ class EventGenerator(object):
# (like instance.delete starting while instance.create is still underway)
# That's too much headache.
operations_per_second = float(operations_per_minute) / 60.0
operations_per_second = float(operations_per_hour) / 3600.0
# An operation will happen every so many milliseconds to
# get our operations/sec. We call this a Tick.
@ -113,17 +68,8 @@ class EventGenerator(object):
self.tick = now + datetime.timedelta(
milliseconds=self.millisecond_per_tick)
self.last_exists = now # When were last .exists sent?
def move_to_next_tick(self, now):
return now + datetime.timedelta(milliseconds=self.millisecond_per_tick)
class TemplateEventGenerator(EventGenerator):
def __init__(self, template_dir, operations_per_minute=1000,
exists_hours=24):
super(TemplateEventGenerator, self).__init__(operations_per_minute,
exists_hours)
# When were last .exists sent?
self.last_exists = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Load all the templates ...
template_filenames = [f for f in os.listdir(template_dir)
@ -138,11 +84,19 @@ class TemplateEventGenerator(EventGenerator):
template = json.load(f)
# Keep it as a raw string to make replacements easier ...
raw = json.dumps(template[1:], sort_keys=True, indent=4)
self.templates.append((filename, template[0], raw))
if filename == "eod_exists.json":
self.exists_template = (template[0], raw)
else:
operation = filename[:filename.index('_')]
self.templates.append((operation, template[0], raw))
def move_to_next_tick(self, now):
return now + datetime.timedelta(milliseconds=self.millisecond_per_tick)
def generate(self, now):
self._add_new_sequence(now)
return [] # self._get_ready_events(now)
return self._get_ready_events(now)
def _add_new_sequence(self, now):
"""Add a new operation to the queue.
@ -154,21 +108,71 @@ class TemplateEventGenerator(EventGenerator):
context, sequence = self._get_sequence(now)
for idx, when_event in enumerate(sequence):
when, event = when_event
print when, event['event_type']
event['____context____'] = context # delete before returning
# (when, is_first_event, is_last_event)
heapq.heappush(self.next_events,
(when, idx==0, idx==len(sequence)-1))
print "------------------------------"
(when, event, idx==0, idx==len(sequence)-1))
self.tick = self.move_to_next_tick(now)
return now
def _get_ready_events(self, now):
"""Pump out the events that are due now."""
ready = []
while True:
if not self.next_events:
return ready
when, event, start, end = self.next_events[0] # peek
if when > now:
break
when, event, start, end = heapq.heappop(self.next_events)
context = event['____context____']
operation = context['operation']
uuid = context['instance_id']
if end:
if operation == 'compute.instance.delete.start':
try:
self.instances_in_use.remove(uuid)
except KeyError:
pass
else:
self.instances_in_use.add(uuid)
print "%s %4s-%s %s" % (when, uuid[-4:], operation[17:], event['event_type'])
ready.append(event)
if (now - self.last_exists).days > 0:
flattened = now.replace(hour=0, minute=0, second=0, microsecond=0)
self.last_exists = flattened
audit_period_start = flattened - datetime.timedelta(days=1)
audit_period_end = flattened
operation = "eod-exists"
context_hints, template = self.exists_template
for instance in self.instances_in_use:
context, sequence = self._make_sequence_from_template(now,
operation, context_hints, template)
for when, event in sequence:
payload = event['payload']
payload['audit_period_beginning'] = audit_period_start
payload['audit_period_ending'] = audit_period_start
payload['instance_id'] = instance
print "%s %s %s" % (now, instance, event['event_type'])
ready.append(event)
return ready
def _get_sequence(self, now):
"""Grab a template and make a sequence from it.
"""
sequence = []
filename, context_hints, template = random.choice(self.templates)
print "Using", filename
operation, context_hints, template = random.choice(self.templates)
return self._make_sequence_from_template(now, operation,
context_hints, template)
def _make_sequence_from_template(self, now, operation, context_hints,
template):
sequence = []
context = {}
time_map = context_hints['time_map']
for key, values in time_map.iteritems():
@ -219,252 +223,25 @@ class TemplateEventGenerator(EventGenerator):
if instance_id and instance_id != inst_id:
print "changing instance id", instance_id, inst_id
instance_id = inst_id
sequence.append((event['timestamp'], event))
when = dateutil.parser.parse(event['timestamp'])
sequence.append((when, event))
context['instance_id'] = instance_id
context['operation'] = operation
return context, sorted(sequence)
class TinyEventGenerator(EventGenerator):
# Generates OpenStack-like events without event templates.
# The event payloads are not complete and the event sequences
# are mostly made up.
# If you have a StackTach.v2 deployment, you can use the
# template generator in ./bin to make your own templates.
def generate(self, now):
self._add_new_sequence(now)
events = self._get_ready_events(now)
for event in events:
event['when'] = str(event['when'])
event['message_id'] = str(uuidlib.uuid4())
return events
def _add_new_sequence(self, now):
"""Add a new operation to the queue.
This is the entire sequence of events going into
the future. They will be interwoven with other
future events and pumped out in proper (interleaving)
order."""
if now >= self.tick:
action = self._get_action(now)
for idx, event in enumerate(action):
when = event['when']
if idx == 0:
uuid = event['uuid'][-4:]
request = event['request_id'][-4:]
if False:
if event['is_create']:
print "CREATE:",
if event['is_delete']:
print "DELETE:",
if event['is_update']:
print "UPDATE:",
print "U:%s R:%s" % (uuid, request),
print "(%d of %d)" % (len(self.instances_in_use), \
len(self.instances))
# (when, event, is_first_event, is_last_event)
heapq.heappush(self.next_events,
(when, event, idx==0, idx==len(action)-1))
self.tick = self.move_to_next_tick(now)
return now
def _get_ready_events(self, now):
"""Pump out all the ready events."""
ready = []
while True:
if not self.next_events:
return ready
when, event, start, end = self.next_events[0] # peek
if when > now:
break
when, event, start, end = heapq.heappop(self.next_events)
uuid = event['uuid']
request = event['request_id']
if end:
if event['is_create']:
self.instances_in_use.add(uuid)
elif event['is_delete']:
self.instances_in_use.remove(uuid)
#print "%s %40s U:%4s" % (' ' * 20, event['event_type'], uuid[-4:])
ready.append(event)
# Send .exists every N hours for all active instances.
# Ensure the datetime of the .exists notification is HH:MM = X:00
# so we have regular blocks. In the situation we were doing
# End-of-Day .exists, we'd want the datetime to be 00:00
# like Nova does by default.
if now.minute < self.last_exists.minute:
# Minute rollover occured.
if (now - self.last_exists).seconds > (self.exists_hours*3600):
flattened = now.replace(minute=0, second=0, microsecond=0)
if self.exists_hours > 23:
flattened = now.replace(hour=0, minute=0, second=0,
microsecond=0)
self.last_exists = now
for instance in self.instances_in_use:
audit_period_start = flattened - datetime.timedelta(
hours=self.exists_hours)
audit_period_end = flattened
base = {'uuid': instance,
'audit_period_start': audit_period_start,
'audit_period_end': audit_period_end}
events, now = self._event(now,
base, "exists_node",
"compute.instance.exists")
ready.extend(events)
return ready
def _get_action(self, now):
"""Get an action sequence. A series of related events
that perform an operation. At this stage all it has
is a request_id."""
request_id = "req_" + str(uuidlib.uuid4())
base = {'request_id': request_id}
return self._make_action(now, base)
def _make_action(self, now, base):
"""Start creating records that look like OpenStack events.
api [-> scheduler] -> compute node.
instances_in_use is different than instances.keys():
instances.keys() is the list of all instances, even instances that
don't exist yet, but will be created in the near future.
instance_in_use are the instances in the current timeline.
While there are no in-use instances, create new ones.
After that, 10% chance of new instance. Otherwise,
20% chance it's a delete. The remaining 80% are
instance update operations.
"""
event_chain = []
is_create = random.randrange(100) < 10
is_delete = False
is_update = False
uuid = str(uuidlib.uuid4())
compute_node = random.choice(COMPUTE_NODES)
if not is_create and not self.instances_in_use:
is_create = True
if not is_create:
temp_uuid = random.choice(list(self.instances_in_use))
try:
compute_node = self.instances[temp_uuid]
uuid = temp_uuid
# 20% of the time it's a Delete, otherwise an Update ...
is_delete = random.randrange(100) < 20
if not is_delete:
is_update = True
except KeyError:
# The instance is in the process of being deleted.
is_create = True
if not (is_create or is_delete or is_update):
raise Exception("Why?!")
is_create = True
nbase = {'uuid': uuid, 'is_create': is_create, 'is_delete': is_delete,
'is_update': is_update}
nbase.update(base)
# All operations start with an API call ...
api, now = self._mk_event(now, nbase, API_NODES,
['compute.instance.update'])
now = self._bump_time(now, 0.5, 3.0) # From api to service
event_chain.extend(api)
if is_create:
scheduler_node = random.choice(SCHEDULERS)
for e in SCHEDULER_EVENTS:
z, now = self._event(now, nbase, scheduler_node, e)
event_chain.extend(z)
now = self._bump_time(now, 0.1, 0.5) # inside scheduler
now = self._bump_time(now, 0.5, 3.0) # In Compute node
z, now = self._event(now, nbase, compute_node,
'compute.instance.create.*')
event_chain.extend(z)
self.instances[uuid] = compute_node
if is_delete:
z, now = self._event(now, nbase, compute_node,
'compute.instance.delete.*')
event_chain.extend(z)
del self.instances[uuid]
if is_update:
event = random.choice(COMPUTE_EVENTS)
z, now = self._event(now, nbase, compute_node, event)
event_chain.extend(z)
# End the chain with a .exists record
now = self._bump_time(now, 0.1, 0.5)
z, now = self._event(now, nbase, compute_node, "compute.instance.exists")
event_chain.extend(z)
return event_chain
def _bump_time(self, now, low, high):
"""Create a random time in fractional seconds and move now ahead
that amount."""
secs = low + ((high - low) * random.random())
return now + datetime.timedelta(seconds=secs)
def _mk_event(self, now, base, nodes, events):
"""Make a single event with random node/events.
If the event name ends in .* we will generate
the corresponding .start and .end events
while we're at it."""
return self._event(now, base, random.choice(nodes),
random.choice(events))
def _event(self, now, base, node, event):
"""Make a single event or a pair of events (depending on the
event type)"""
results = []
if event[-1] == '*':
event = event[0:-1]
extra = {'when': now, 'node': node}
results.append(self._pkg(base, extra,
{'event_type': event + "start"}))
now = self._bump_time(now, 0.25, 60.0 * 15.0) # In compute node
extra = {'when': now, 'node': node}
results.append(self._pkg(base, extra, {'event_type': event + "end"}))
else:
extra = {'when': now, 'node': node}
results.append(self._pkg(base, extra, {'event_type': event}))
return results, now
def _pkg(self, *args):
"""Pack together a bunch of dict's into a single dict."""
new = {}
for a in args:
new.update(a)
return new
if __name__ == '__main__':
real_time = False
# The lower the ops/minute, the longer it will
# take to get N events. This is useful for getting
# .exists generated (hourly).
g = TemplateEventGenerator("templates", 1)
g = EventGenerator("templates", 1)
now = datetime.datetime.utcnow()
print "starting at", now
end = now + datetime.timedelta(days=1) # ensure .exists get generated
start = now
nevents = 0
while nevents < 10000:
while now < end:
e = g.generate(now)
if e:
nevents += len(e)