diff --git a/setup.cfg b/setup.cfg
index 2ba939e..60ea942 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,7 +1,7 @@
[metadata]
description-file = README.md
name = winchester
-version = 0.57
+version = 0.62
author = Monsyne Dragon
author_email = mdragon@rackspace.com
summary = An OpenStack notification event processing library.
diff --git a/tests/test_atompub.py b/tests/test_atompub.py
new file mode 100644
index 0000000..10888e7
--- /dev/null
+++ b/tests/test_atompub.py
@@ -0,0 +1,151 @@
+# Copyright (c) 2015 Rackspace
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import unittest2 as unittest
+
+import mock
+
+from winchester import pipeline_handler
+
+
+class TestException(Exception):
+ pass
+
+
+class TestAtomPubHandler(unittest.TestCase):
+
+ def test_constructor_event_types(self):
+ fakeurl = 'fake://'
+ h = pipeline_handler.AtomPubHandler(fakeurl)
+ self.assertEqual(h.included_types, ['*'])
+ self.assertEqual(h.excluded_types, [])
+
+ h = pipeline_handler.AtomPubHandler(fakeurl,
+ event_types='test.thing')
+ self.assertEqual(h.included_types, ['test.thing'])
+ self.assertEqual(h.excluded_types, [])
+
+ h = pipeline_handler.AtomPubHandler(fakeurl,
+ event_types=['test.thing'])
+ self.assertEqual(h.included_types, ['test.thing'])
+ self.assertEqual(h.excluded_types, [])
+
+ h = pipeline_handler.AtomPubHandler(fakeurl,
+ event_types=['!test.thing'])
+ self.assertEqual(h.included_types, ['*'])
+ self.assertEqual(h.excluded_types, ['test.thing'])
+
+ def test_match_type(self):
+ event_types = ["test.foo.bar", "!test.wakka.wakka"]
+ h = pipeline_handler.AtomPubHandler('fakeurl',
+ event_types=event_types)
+ self.assertTrue(h.match_type('test.foo.bar'))
+ self.assertFalse(h.match_type('test.wakka.wakka'))
+ self.assertFalse(h.match_type('test.foo.baz'))
+
+ event_types = ["test.foo.*", "!test.wakka.*"]
+ h = pipeline_handler.AtomPubHandler('fakeurl',
+ event_types=event_types)
+ self.assertTrue(h.match_type('test.foo.bar'))
+ self.assertTrue(h.match_type('test.foo.baz'))
+ self.assertFalse(h.match_type('test.wakka.wakka'))
+
+ def test_handle_events(self):
+ event_types = ["test.foo.*", "!test.wakka.*"]
+ h = pipeline_handler.AtomPubHandler('fakeurl',
+ event_types=event_types)
+ event1 = dict(event_type="test.foo.zazz")
+ event2 = dict(event_type="test.wakka.zazz")
+ event3 = dict(event_type="test.boingy")
+ events = [event1, event2, event3]
+ res = h.handle_events(events, dict())
+ self.assertEqual(events, res)
+ self.assertIn(event1, h.events)
+ self.assertNotIn(event2, h.events)
+ self.assertNotIn(event3, h.events)
+
+ def test_format_cuf_xml(self):
+ expected = (''
+ '')
+ d1 = datetime.datetime(2015, 8, 10, 0, 0, 0)
+ d2 = datetime.datetime(2015, 8, 11, 0, 0, 0)
+ d3 = datetime.datetime(2015, 8, 9, 15, 21, 0)
+ event = dict(message_id='1234-56789',
+ event_type='test.thing',
+ audit_period_beginning=d1,
+ audit_period_ending=d2,
+ launched_at=d3,
+ instance_id='98765-4321',
+ state='active',
+ state_description='',
+ rax_options='4')
+ extra = dict(data_center='TST1', region='TST')
+ h = pipeline_handler.AtomPubHandler('fakeurl',
+ extra_info=extra)
+ res, content_type = h.format_cuf_xml(event)
+ self.assertEqual(res, expected)
+ self.assertEqual(content_type, 'application/xml')
+
+ def test_generate_atom(self):
+ expected = (""""""
+ """urn:uuid:12-34"""
+ """"""
+ """"""
+ """Server"""
+ """TEST_CONTENT"""
+ """""")
+ event = dict(message_id='12-34',
+ original_message_id='56-78',
+ event_type='test.thing')
+ event_type = 'test.thing.bar'
+ ctype = 'test/thing'
+ content = 'TEST_CONTENT'
+ h = pipeline_handler.AtomPubHandler('fakeurl')
+ atom = h.generate_atom(event, event_type, content, ctype)
+ self.assertEqual(atom, expected)
+
+ @mock.patch.object(pipeline_handler.requests, 'post')
+ @mock.patch.object(pipeline_handler.AtomPubHandler, '_get_auth')
+ def test_send_event(self, auth, rpost):
+ test_headers = {'Content-Type': 'application/atom+xml',
+ 'X-Auth-Token': 'testtoken'}
+ auth.return_value = test_headers
+ test_response = mock.MagicMock('http response')
+ test_response.status_code = 200
+ rpost.return_value = test_response
+ h = pipeline_handler.AtomPubHandler('fakeurl', http_timeout=123,
+ wait_interval=10, max_wait=100)
+ test_atom = mock.MagicMock('atom content')
+
+ status = h._send_event(test_atom)
+
+ self.assertEqual(1, auth.call_count)
+ self.assertEqual(1, rpost.call_count)
+ rpost.assert_called_with('fakeurl',
+ data=test_atom,
+ headers=test_headers,
+ timeout=123)
+ self.assertEqual(status, 200)
diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py
index cba3d0e..77def88 100644
--- a/tests/test_usage_handler.py
+++ b/tests/test_usage_handler.py
@@ -211,7 +211,7 @@ class TestUsageHandler(unittest.TestCase):
f['event_type'])
self.assertEqual("now", f['timestamp'])
self.assertEqual(123, f['stream_id'])
- self.assertEqual("inst", f['payload']['instance_id'])
+ self.assertEqual("inst", f['instance_id'])
self.assertEqual("None", f['error'])
self.assertIsNone(f['error_code'])
@@ -228,7 +228,7 @@ class TestUsageHandler(unittest.TestCase):
f['event_type'])
self.assertEqual("now", f['timestamp'])
self.assertEqual(123, f['stream_id'])
- self.assertEqual("inst", f['payload']['instance_id'])
+ self.assertEqual("inst", f['instance_id'])
self.assertEqual("Error", f['error'])
self.assertEqual("UX", f['error_code'])
@@ -301,11 +301,9 @@ class TestUsageHandler(unittest.TestCase):
env = {'stream_id': 123}
raw = [{'event_type': 'foo'}]
events = self.handler.handle_events(raw, env)
- self.assertEqual(1, len(events))
- notifications = env['usage_notifications']
- self.assertEqual(1, len(notifications))
+ self.assertEqual(2, len(events))
self.assertEqual("compute.instance.exists.failed",
- notifications[0]['event_type'])
+ events[-1]['event_type'])
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
def test_handle_events_exists(self, pb):
@@ -325,9 +323,7 @@ class TestUsageHandler(unittest.TestCase):
{'event_type': 'foo'},
]
events = self.handler.handle_events(raw, env)
- self.assertEqual(3, len(events))
- notifications = env['usage_notifications']
- self.assertEqual(1, len(notifications))
+ self.assertEqual(4, len(events))
self.assertEqual("compute.instance.exists.failed",
- notifications[0]['event_type'])
+ events[-1]['event_type'])
self.assertTrue(pb.called)
diff --git a/winchester/db/interface.py b/winchester/db/interface.py
index 1f3064b..4220604 100644
--- a/winchester/db/interface.py
+++ b/winchester/db/interface.py
@@ -143,7 +143,12 @@ class DBInterface(object):
event_type = self.get_event_type(event_type, session=session)
e = models.Event(message_id, event_type, generated)
for name in traits:
- e[name] = traits[name]
+ try:
+ e[name] = traits[name]
+ except models.InvalidTraitType:
+ logger.error("Invalid trait for %s "
+ "(%s) %s" % (name, traits[name],
+ type(traits[name])))
session.add(e)
@sessioned
diff --git a/winchester/models.py b/winchester/models.py
index 318641e..cf0de67 100644
--- a/winchester/models.py
+++ b/winchester/models.py
@@ -160,6 +160,7 @@ class PolymorphicVerticalProperty(object):
ATTRIBUTE_MAP = {Datatype.none: None}
PY_TYPE_MAP = {unicode: Datatype.string,
int: Datatype.int,
+ long: Datatype.int,
float: Datatype.float,
datetime: Datatype.datetime,
DBTimeRange: Datatype.timerange}
diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py
index 93ff32e..a7ce7d3 100644
--- a/winchester/pipeline_handler.py
+++ b/winchester/pipeline_handler.py
@@ -15,12 +15,17 @@
# limitations under the License.
import abc
+import collections
import datetime
+import fnmatch
+import json
import logging
import six
+import time
import uuid
from notabene import kombu_driver as driver
+import requests
logger = logging.getLogger(__name__)
@@ -358,33 +363,54 @@ class UsageHandler(PipelineHandlerBase):
self._confirm_delete(exists, deleted, delete_fields)
def _base_notification(self, exists):
- apb, ape = self._get_audit_period(exists)
- return {
- 'payload': {
- 'audit_period_beginning': str(apb),
- 'audit_period_ending': str(ape),
- 'launched_at': str(exists.get('launched_at', '')),
- 'deleted_at': str(exists.get('deleted_at', '')),
- 'instance_id': exists.get('instance_id', ''),
- 'tenant_id': exists.get('tenant_id', ''),
- 'display_name': exists.get('display_name', ''),
- 'instance_type': exists.get('instance_flavor', ''),
- 'instance_flavor_id': exists.get('instance_flavor_id', ''),
- 'state': exists.get('state', ''),
- 'state_description': exists.get('state_description', ''),
- 'bandwidth': {'public': {
- 'bw_in': exists.get('bandwidth_in', 0),
- 'bw_out': exists.get('bandwidth_out', 0)}},
- 'image_meta': {
- 'org.openstack__1__architecture': exists.get(
- 'os_architecture', ''),
- 'org.openstack__1__os_version': exists.get('os_version',
- ''),
- 'org.openstack__1__os_distro': exists.get('os_distro', ''),
- 'org.rackspace__1__options': exists.get('rax_options', '0')
- }
- },
- 'original_message_id': exists.get('message_id', '')}
+ basen = exists.copy()
+ if 'bandwidth_in' not in basen:
+ basen['bandwidth_in'] = 0
+ if 'bandwidth_out' not in basen:
+ basen['bandwidth_out'] = 0
+ if 'rax_options' not in basen:
+ basen['rax_options'] = '0'
+ basen['original_message_id'] = exists.get('message_id', '')
+ return basen
+# apb, ape = self._get_audit_period(exists)
+# return {
+# 'payload': {
+# 'audit_period_beginning': str(apb),
+# 'audit_period_ending': str(ape),
+# 'launched_at': str(exists.get('launched_at', '')),
+# 'deleted_at': str(exists.get('deleted_at', '')),
+# 'instance_id': exists.get('instance_id', ''),
+# 'tenant_id': exists.get('tenant_id', ''),
+# 'display_name': exists.get('display_name', ''),
+# 'instance_type': exists.get('instance_flavor', ''),
+# 'instance_flavor_id': exists.get('instance_flavor_id', ''),
+# 'state': exists.get('state', ''),
+# 'state_description': exists.get('state_description', ''),
+# 'bandwidth': {'public': {
+# 'bw_in': exists.get('bandwidth_in', 0),
+# 'bw_out': exists.get('bandwidth_out', 0)}},
+# 'image_meta': {
+# 'org.openstack__1__architecture': exists.get(
+# 'os_architecture', ''),
+# 'org.openstack__1__os_version': exists.get('os_version',
+# ''),
+# 'org.openstack__1__os_distro': exists.get('os_distro', ''),
+# 'org.rackspace__1__options': exists.get('rax_options', '0')
+# }
+# },
+# 'original_message_id': exists.get('message_id', '')}
+
+ def _generate_new_id(self, original_message_id, event_type):
+ # Generate message_id for new events deterministically from
+ # the original message_id and event type using uuid5 algo.
+ # This will allow any dups to be caught by message_id. (mdragon)
+ if original_message_id:
+ oid = uuid.UUID(original_message_id)
+ return uuid.uuid5(oid, event_type)
+ else:
+ logger.error("Generating %s, but origional message missing"
+ " origional_message_id." % event_type)
+ return uuid.uuid4()
def _process_block(self, block, exists):
error = None
@@ -422,12 +448,13 @@ class UsageHandler(PipelineHandlerBase):
datetime.datetime.utcnow()),
'stream_id': int(self.stream_id),
'instance_id': instance_id,
- 'warnings': self.warnings}
+ 'warnings': ', '.join(self.warnings)}
events.append(warning_event)
new_event = self._base_notification(exists)
+ new_event['message_id'] = self._generate_new_id(
+ new_event['original_message_id'], event_type)
new_event.update({'event_type': event_type,
- 'message_id': str(uuid.uuid4()),
'publisher_id': 'stv3',
'timestamp': exists.get('timestamp',
datetime.datetime.utcnow()),
@@ -466,11 +493,299 @@ class UsageHandler(PipelineHandlerBase):
}
new_events.append(new_event)
- env['usage_notifications'] = new_events
- return events
+ return events + new_events
def commit(self):
pass
def rollback(self):
pass
+
+
+class AtomPubException(Exception):
+ pass
+
+
+cuf_template = ("""""")
+
+
+class AtomPubHandler(PipelineHandlerBase):
+ auth_token_cache = None
+
+ def __init__(self, url, event_types=None, extra_info=None,
+ auth_user='', auth_key='', auth_server='',
+ wait_interval=30, max_wait=600, http_timeout=120, **kw):
+ super(AtomPubHandler, self).__init__(**kw)
+ self.events = []
+ self.included_types = []
+ self.excluded_types = []
+ self.url = url
+ self.auth_user = auth_user
+ self.auth_key = auth_key
+ self.auth_server = auth_server
+ self.wait_interval = wait_interval
+ self.max_wait = max_wait
+ self.http_timeout = http_timeout
+ if extra_info:
+ self.extra_info = extra_info
+ else:
+ self.extra_info = {}
+
+ if event_types:
+ if isinstance(event_types, six.string_types):
+ event_types = [event_types]
+ for t in event_types:
+ if t.startswith('!'):
+ self.excluded_types.append(t[1:])
+ else:
+ self.included_types.append(t)
+ else:
+ self.included_types.append('*')
+ if self.excluded_types and not self.included_types:
+ self.included_types.append('*')
+
+ def _included_type(self, event_type):
+ return any(fnmatch.fnmatch(event_type, t) for t in self.included_types)
+
+ def _excluded_type(self, event_type):
+ return any(fnmatch.fnmatch(event_type, t) for t in self.excluded_types)
+
+ def match_type(self, event_type):
+ return (self._included_type(event_type)
+ and not self._excluded_type(event_type))
+
+ def handle_events(self, events, env):
+ for event in events:
+ event_type = event['event_type']
+ if self.match_type(event_type):
+ self.events.append(event)
+ logger.debug("Matched %s events." % len(self.events))
+ return events
+
+ def commit(self):
+ for event in self.events:
+ event_type = event.get('event_type', '')
+ message_id = event.get('message_id', '')
+ try:
+ status = self.publish_event(event)
+ logger.debug("Sent %s event %s. Status %s" % (event_type,
+ message_id,
+ status))
+ except Exception:
+ original_message_id = event.get('original_message_id', '')
+ logger.exception("Error publishing %s event %s "
+ "(original id: %s)!" % (event_type,
+ message_id,
+ original_message_id))
+
+ def publish_event(self, event):
+ content, content_type = self.format_cuf_xml(event)
+ event_type = self.event_type_cuf_xml(event.get('event_type'))
+ atom = self.generate_atom(event, event_type, content, content_type)
+
+ logger.debug("Publishing event: %s" % atom)
+ return self._send_event(atom)
+
+ def generate_atom(self, event, event_type, content, content_type):
+ template = (""""""
+ """urn:uuid:%(message_id)s"""
+ """"""
+ """"""
+ """Server"""
+ """%(content)s"""
+ """""")
+ info = dict(message_id=event.get('message_id'),
+ original_message_id=event.get('original_message_id'),
+ event=event,
+ event_type=event_type,
+ content=content,
+ content_type=content_type)
+ return template % info
+
+ def event_type_cuf_xml(self, event_type):
+ return event_type + ".cuf"
+
+ def format_cuf_xml(self, event):
+ tvals = collections.defaultdict(lambda: '')
+ tvals.update(event)
+ tvals.update(self.extra_info)
+ start_time, end_time = self._get_times(event)
+ tvals['start_time'] = self._format_time(start_time)
+ tvals['end_time'] = self._format_time(end_time)
+ tvals['status'] = self._get_status(event)
+ tvals['options'] = self._get_options(event)
+ c = cuf_template % tvals
+ return (c, 'application/xml')
+
+ def _get_options(self, event):
+ opt = int(event.get('rax_options', 0))
+ flags = [bool(opt & (2**i)) for i in range(8)]
+ os = 'LINUX'
+ app = None
+ if flags[0]:
+ os = 'RHEL'
+ if flags[2]:
+ os = 'WINDOWS'
+ if flags[6]:
+ os = 'VYATTA'
+ if flags[3]:
+ app = 'MSSQL'
+ if flags[5]:
+ app = 'MSSQL_WEB'
+ if app is None:
+ return 'osLicenseType="%s"' % os
+ else:
+ return 'osLicenseType="%s" applicationLicense="%s"' % (os, app)
+
+ def _get_status(self, event):
+ state = event.get('state')
+ state_description = event.get('state_description')
+ status = 'UNKNOWN'
+ status_map = {
+ "building": 'BUILD',
+ "stopped": 'SHUTOFF',
+ "paused": 'PAUSED',
+ "suspended": 'SUSPENDED',
+ "rescued": 'RESCUE',
+ "error": 'ERROR',
+ "deleted": 'DELETED',
+ "soft-delete": 'SOFT_DELETED',
+ "shelved": 'SHELVED',
+ "shelved_offloaded": 'SHELVED_OFFLOADED',
+ }
+ if state in status_map:
+ status = status_map[state]
+ if state == 'resized':
+ if state_description == 'resize_reverting':
+ status = 'REVERT_RESIZE'
+ else:
+ status = 'VERIFY_RESIZE'
+ if state == 'active':
+ active_map = {
+ "rebooting": 'REBOOT',
+ "rebooting_hard": 'HARD_REBOOT',
+ "updating_password": 'PASSWORD',
+ "rebuilding": 'REBUILD',
+ "rebuild_block_device_mapping": 'REBUILD',
+ "rebuild_spawning": 'REBUILD',
+ "migrating": 'MIGRATING',
+ "resize_prep": 'RESIZE',
+ "resize_migrating": 'RESIZE',
+ "resize_migrated": 'RESIZE',
+ "resize_finish": 'RESIZE',
+ }
+ status = active_map.get(state_description, 'ACTIVE')
+ if status == 'UNKNOWN':
+ logger.error("Unknown status for event %s: state %s (%s)" % (
+ event.get('message_id'), state, state_description))
+ return status
+
+ def _get_times(self, event):
+ audit_period_beginning = event.get('audit_period_beginning')
+ audit_period_ending = event.get('audit_period_ending')
+ launched_at = event.get('launched_at')
+ terminated_at = event.get('terminated_at')
+ if not terminated_at:
+ terminated_at = event.get('deleted_at')
+
+ start_time = max(launched_at, audit_period_beginning)
+ if not terminated_at:
+ end_time = audit_period_ending
+ else:
+ end_time = min(terminated_at, audit_period_ending)
+ if start_time > end_time:
+ start_time = audit_period_beginning
+ return (start_time, end_time)
+
+ def _format_time(self, dt):
+ time_format = "%Y-%m-%dT%H:%M:%SZ"
+ if dt:
+ return datetime.datetime.strftime(dt, time_format)
+ else:
+ return ''
+
+ def _get_auth(self, force=False, headers=None):
+ if headers is None:
+ headers = {}
+ if force or not AtomPubHandler.auth_token_cache:
+ auth_body = {"auth": {
+ "RAX-KSKEY:apiKeyCredentials": {
+ "username": self.auth_user,
+ "apiKey": self.auth_key,
+ }}}
+ auth_headers = {"User-Agent": "Winchester",
+ "Accept": "application/json",
+ "Content-Type": "application/json"}
+ logger.debug("Contacting auth server %s" % self.auth_server)
+ res = requests.post(self.auth_server,
+ data=json.dumps(auth_body),
+ headers=auth_headers)
+ res.raise_for_status()
+ token = res.json()["access"]["token"]["id"]
+ logger.debug("Token received: %s" % token)
+ AtomPubHandler.auth_token_cache = token
+ headers["X-Auth-Token"] = AtomPubHandler.auth_token_cache
+ return headers
+
+ def _send_event(self, atom):
+ headers = {"Content-Type": "application/atom+xml"}
+ headers = self._get_auth(headers=headers)
+ attempts = 0
+ status = 0
+ while True:
+ try:
+ res = requests.post(self.url,
+ data=atom,
+ headers=headers,
+ timeout=self.http_timeout)
+ status = res.status_code
+ if status >= 200 and status < 300:
+ break
+ if status == 401:
+ logger.info("Auth expired, reauthorizing...")
+ headers = self._get_auth(headers=headers, force=True)
+ continue
+ if status == 409:
+ # they already have this. No need to retry. (mdragon)
+ logger.debug("Duplicate message: \n%s" % atom)
+ break
+ if status == 400:
+ # AtomPub server won't accept content.
+ logger.error("Invalid Content: Server rejected content: "
+ "\n%s" % atom)
+ break
+ except requests.exceptions.ConnectionError:
+ logger.exception("Connection error talking to %s" % self.url)
+ except requests.exceptions.Timeout:
+ logger.exception("HTTP timeout talking to %s" % self.url)
+ except requests.exceptions.HTTPError:
+ logger.exception("HTTP protocol error talking to "
+ "%s" % self.url)
+ except requests.exceptions.RequestException:
+ logger.exception("Unknown exeption talking to %s" % self.url)
+ # If we got here, something went wrong
+ attempts += 1
+ wait = min(attempts * self.wait_interval, self.max_wait)
+ logger.error("Message delivery failed, going to sleep, will "
+ "try again in %s seconds" % str(wait))
+ time.sleep(wait)
+ return status
+
+ def rollback(self):
+ pass
diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py
index 6ba0910..04ab9a3 100644
--- a/winchester/trigger_manager.py
+++ b/winchester/trigger_manager.py
@@ -193,14 +193,18 @@ class TriggerManager(object):
return self.time_sync.current_time()
def save_event(self, event):
- traits = event.copy()
+ traits = {}
try:
- message_id = traits.pop('message_id')
- timestamp = traits.pop('timestamp')
- event_type = traits.pop('event_type')
+ message_id = event['message_id']
+ timestamp = event['timestamp']
+ event_type = event['event_type']
except KeyError as e:
logger.warning("Received invalid event: %s" % e)
return False
+ for key, val in event.items():
+ if key not in ('message_id', 'timestamp', 'event_type'):
+ if val is not None:
+ traits[key] = val
try:
self.db.create_event(message_id, event_type,
timestamp, traits)