pump events from STv2 db

This requires the introduction of the time_sync.py service.
A trivial HTTP service that allows the POST/GET of timestamps.
This can used between the consumer and stream pipeline processor
to synchronize time when replaying old events (so they don't
expire too prematurely).

Change-Id: Iba61b3b75f68a19869d06a0484d2676dd44fdc60
This commit is contained in:
Sandy Walsh 2014-10-08 07:15:55 -07:00
parent 2dfc24c716
commit d2b591913b
2 changed files with 121 additions and 0 deletions

87
bin/pump_from_stv2.py Normal file
View File

@ -0,0 +1,87 @@
"""Pump simulated OpenStack notificationss into RabbitMQ.
You need to install rabbitqm-server and
pip install librabbitmq
pip install --pre notabene
pip install --pre notification_utils
"""
import datetime
import json
import sys
import time
import mysql.connector
from notabene import kombu_driver as driver
import notification_utils as nu
connection = driver.create_connection("localhost", 5672, 'guest', 'guest',
"librabbitmq", "/")
exchange = driver.create_exchange("monitor", "topic")
queue_name = "monitor.info"
queue = driver.create_queue(queue_name, exchange, queue_name,
channel=connection.channel())
queue.declare()
cnx = mysql.connector.connect(user='root', password='password',
host='127.0.0.1',
database='stacktach')
cursor = cnx.cursor()
query = ("SELECT min(stacktach_rawdata.when) AS mindate, "
"MAX(stacktach_rawdata.when) AS maxdate "
"FROM stacktach_rawdata")
cursor.execute(query)
mindate, maxdate = list(cursor)[0]
cursor.close()
oldest = nu.dt_from_decimal(mindate)
newest = nu.dt_from_decimal(maxdate)
# Move to the next day so we get all the events for the day.
start = oldest.replace(hour=0,minute=0,second=0,microsecond=0) \
+ datetime.timedelta(days=1)
end = start + datetime.timedelta(days=1, hours=4)
print "Events from %s to %s" % (oldest, newest)
print "Extracting events from %s to %s" % (start, end)
time.sleep(5)
dstart = nu.dt_to_decimal(start)
dend = nu.dt_to_decimal(end)
cursor = cnx.cursor()
query = ("SELECT stacktach_rawdata.when AS d, "
"stacktach_rawdata.json AS rawjson "
"FROM stacktach_rawdata "
"WHERE (stacktach_rawdata.when BETWEEN %f AND %f) "
"AND stacktach_rawdata.event!='compute.instance.updates' "
"AND stacktach_rawdata.event!='compute.instance.exists.verified' "
"ORDER BY stacktach_rawdata.when LIMIT 100000" % (dstart, dend))
cursor.execute(query)
start = None
end = None
num = 0
for when, rawjson in cursor:
when = nu.dt_from_decimal(when)
if not start:
start = when
end = start + datetime.timedelta(days=1)
if when > end:
break
queue, event = json.loads(rawjson)
# Skip the noise ...
if event['event_type'] in ['compute.instance.update', 'compute.instance.exists.verified']:
continue
print when, event['event_type']
driver.send_notification(event, queue_name, connection, exchange)
num+=1
print "Published %d events" % num
cursor.close()
cnx.close()

34
bin/time_sync.py Normal file
View File

@ -0,0 +1,34 @@
import datetime
import falcon
# gunicorn --log-file=- 'time_sync:get_api()'
#
# To get the current time
# curl localhost:8000/time
#
# To set the current time
# curl --data "2014-10-09 22:55:33.111111" localhost:8000/time
class TimeResource(object):
def __init__(self):
self.last_time = None
def on_get(self, req, resp):
resp.body = self.last_time
print "GET", self.last_time
def on_post(self, req, resp):
chunk = req.stream.read(4096)
self.last_time = chunk
print "POST", self.last_time
api = falcon.API()
time_resource = TimeResource()
api.add_route('/time', time_resource)
def get_api():
return api