diff --git a/demo/post_webhook.py b/demo/post_webhook.py index 67ad7fb..bd441bf 100644 --- a/demo/post_webhook.py +++ b/demo/post_webhook.py @@ -1,13 +1,39 @@ +import datetime import json import requests -import time -import random + +import kafka.client +import kafka.common +import kafka.consumer address = "http://192.168.10.4:8765/events.php" -while True: - t = random.uniform(100, 10000) - body = {'VM Create time': '{}'.format(t), +kc = kafka.client.KafkaClient("192.168.10.4:9092") +consumer = kafka.consumer.SimpleConsumer(kc, + "Foo", + "stream-notifications", + auto_commit=False) + +for raw_event in consumer: + event = json.loads(raw_event.message.value) + + times = {} + + for e in event['events']: + times[e['event_type']] = e['timestamp'] + + try: + microseconds_per_second = 1000000 + time_format = '%Y-%m-%dT%H:%M:%S.%f' + start = datetime.datetime.strptime(times['compute.instance.create.start'], + time_format) + end = datetime.datetime.strptime(times['compute.instance.create.end'], + time_format) + duration = ((end - start).total_seconds() * microseconds_per_second) + except Exception: + continue + + body = {'VM Create time': '{}'.format(duration), 'units': 'ms'} headers = {'content-type': 'application/json'} @@ -17,6 +43,3 @@ while True: headers=headers) except Exception as e: print("unable to post") - print e - - time.sleep(3)