Add PEP8 check and fix related issues

- Add PEP8 section to tox.ini
- Add hacking to requirements to enforce OpenStack style requirements
- Fix large number of formatting issues flagged by flake8 check
- Add copyright notices to all remaining files
- Fix bug in trigger_manager related to logging calls
- Add .gitignore file

Change-Id: I755ab9c8bcc436836f9006fcd671408cc77214c4
This commit is contained in:
Levi Blackstone 2015-05-01 10:49:37 -05:00
parent a2710a98d9
commit 8892801205
26 changed files with 1356 additions and 893 deletions

22
.gitignore vendored
View File

@ -1,10 +1,5 @@
*.py[cod]
AUTHORS
Changelog
# C extensions
*.so
*.py[co]
*.swp
# Packages
*.egg
@ -17,17 +12,22 @@ var
sdist
develop-eggs
.installed.cfg
lib
lib64
__pycache__
# Installer logs
pip-log.txt
# Unit test / coverage reports
test-reporting-results*
.coverage
.tox
nosetests.xml
# Translations
*.mo
#Mr Developer
.mr.developer.cfg
# IDE Project Files
*.project
*.pydev*
*.idea

View File

@ -1,3 +1,4 @@
hacking>=0.10.0,<0.11
simport
stackdistiller
timex

View File

@ -1,12 +1,28 @@
#for Python2.6 compatability.
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# for Python2.6 compatability.
import unittest2 as unittest
import mock
import logging
import datetime
import timex
from winchester import db, models
from winchester import db
from winchester import models
logging.basicConfig()
@ -15,91 +31,99 @@ TEST_DATA = [
dict(id=1, desc='test.thing.begin'),
dict(id=2, desc='test.thing.end'),
dict(id=3, desc='test.otherthing.foo'),
]},
]},
{'event': [
dict(id=1,
message_id='1234-5678-001',
generated=datetime.datetime(2014,8,1,10,20,45,453201),
event_type_id=1,),
message_id='1234-5678-001',
generated=datetime.datetime(2014, 8, 1, 10, 20, 45, 453201),
event_type_id=1, ),
dict(id=2,
message_id='1234-5678-002',
generated=datetime.datetime(2014,8,1,15,25,45,453201),
event_type_id=2,),
message_id='1234-5678-002',
generated=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
event_type_id=2, ),
dict(id=3,
message_id='1234-5678-003',
generated=datetime.datetime(2014,8,1,2,10,12,0),
event_type_id=3,),
message_id='1234-5678-003',
generated=datetime.datetime(2014, 8, 1, 2, 10, 12, 0),
event_type_id=3, ),
dict(id=4,
message_id='1234-5678-004',
generated=datetime.datetime(2014,8,1,4,57,55,42),
event_type_id=3,),
]},
message_id='1234-5678-004',
generated=datetime.datetime(2014, 8, 1, 4, 57, 55, 42),
event_type_id=3, ),
]},
{'trait': [
dict(event_id=1, name='instance_id', type=int(models.Datatype.string),
t_string='aaaa-bbbb-cccc-dddd'),
t_string='aaaa-bbbb-cccc-dddd'),
dict(event_id=1, name='memory_mb', type=int(models.Datatype.int),
t_int=1024),
t_int=1024),
dict(event_id=1, name='test_weight', type=int(models.Datatype.float),
t_float=20112.42),
dict(event_id=1, name='launched_at', type=int(models.Datatype.datetime),
t_datetime=datetime.datetime(2014,7,1,2,30,45,453201)),
]},
t_float=20112.42),
dict(event_id=1, name='launched_at',
type=int(models.Datatype.datetime),
t_datetime=datetime.datetime(2014, 7, 1, 2, 30, 45, 453201)),
]},
{'stream': [
dict(id=1, first_event=datetime.datetime(2014,8,1,2,10,12,0),
last_event=datetime.datetime(2014,8,1,4,57,55,42),
name='test_trigger',
expire_timestamp=datetime.datetime(2014,8,2,4,57,55,42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=2, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='another_test_trigger',
expire_timestamp=datetime.datetime(2014,8,2,4,57,55,42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=3, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='fire_test_trigger',
fire_timestamp=datetime.datetime(2014,8,10,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,15,6,0,0,42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=4, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='fire_test_trigger',
fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=5, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42),
state=int(models.StreamState.error),
state_serial_no=0),
dict(id=6, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42),
state=int(models.StreamState.expire_error),
state_serial_no=0),
dict(id=7, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42),
state=int(models.StreamState.retry_fire),
state_serial_no=0),
dict(id=8, first_event=datetime.datetime(2014,8,1,15,25,45,453201),
last_event=datetime.datetime(2014,8,1,15,25,45,453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42),
expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42),
state=int(models.StreamState.retry_expire),
state_serial_no=0),
]},
dict(id=1, first_event=datetime.datetime(2014, 8, 1, 2, 10, 12, 0),
last_event=datetime.datetime(2014, 8, 1, 4, 57, 55, 42),
name='test_trigger',
expire_timestamp=datetime.datetime(2014, 8, 2, 4, 57, 55, 42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=2,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='another_test_trigger',
expire_timestamp=datetime.datetime(2014, 8, 2, 4, 57, 55, 42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=3,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='fire_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 10, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 15, 6, 0, 0, 42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=4,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='fire_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 11, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 16, 0, 0, 0, 42),
state=int(models.StreamState.active),
state_serial_no=0),
dict(id=5,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 11, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 16, 0, 0, 0, 42),
state=int(models.StreamState.error),
state_serial_no=0),
dict(id=6,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 11, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 16, 0, 0, 0, 42),
state=int(models.StreamState.expire_error),
state_serial_no=0),
dict(id=7,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 11, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 16, 0, 0, 0, 42),
state=int(models.StreamState.retry_fire),
state_serial_no=0),
dict(id=8,
first_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
last_event=datetime.datetime(2014, 8, 1, 15, 25, 45, 453201),
name='reset_test_trigger',
fire_timestamp=datetime.datetime(2014, 8, 11, 6, 0, 0, 42),
expire_timestamp=datetime.datetime(2014, 8, 16, 0, 0, 0, 42),
state=int(models.StreamState.retry_expire),
state_serial_no=0),
]},
{'streamevent': [
dict(stream_id=1, event_id=3),
dict(stream_id=1, event_id=4),
@ -107,30 +131,32 @@ TEST_DATA = [
dict(stream_id=3, event_id=2),
dict(stream_id=3, event_id=1),
dict(stream_id=4, event_id=2),
]},
]},
{'dist_trait': [
dict(stream_id=1, name='instance_id', type=int(models.Datatype.string),
dt_string='zzzz-xxxx-yyyy-wwww'),
dt_string='zzzz-xxxx-yyyy-wwww'),
dict(stream_id=1, name='memory_mb', type=int(models.Datatype.int),
dt_int=4096),
dt_int=4096),
dict(stream_id=1, name='test_weight', type=int(models.Datatype.float),
dt_float=3.1415),
dict(stream_id=1, name='launched_at', type=int(models.Datatype.datetime),
dt_datetime=datetime.datetime(2014,7,8,9,40,50,77777)),
dict(stream_id=1, name='timestamp', type=int(models.Datatype.timerange),
dt_timerange_begin=datetime.datetime(2014,7,8,0,0,0,27),
dt_timerange_end=datetime.datetime(2014,7,9,0,0,0,27)),
]},
dt_float=3.1415),
dict(stream_id=1, name='launched_at',
type=int(models.Datatype.datetime),
dt_datetime=datetime.datetime(2014, 7, 8, 9, 40, 50, 77777)),
dict(stream_id=1, name='timestamp',
type=int(models.Datatype.timerange),
dt_timerange_begin=datetime.datetime(2014, 7, 8, 0, 0, 0, 27),
dt_timerange_end=datetime.datetime(2014, 7, 9, 0, 0, 0, 27)),
]},
]
def create_tables(dbi):
#used for tests
# used for tests
models.Base.metadata.create_all(dbi.engine)
def load_fixture_data(dbi, data):
#Used for tests. This is fugly, refactor later (mdragon)
# Used for tests. This is fugly, refactor later (mdragon)
for table in data:
for table_name, rows in table.items():
for row in rows:
@ -168,16 +194,16 @@ class TestDB(unittest.TestCase):
t = self.db.get_event_type('test.thing.begin')
self.assertEqual(t.id, 1)
t = self.db.get_event_type('test.not_in_db')
self.assertEqual(t.id, 4) #next unused id.
self.assertEqual(t.id, 4) # next unused id.
def test_create_event(self):
message_id = '9876-0001-0001'
event_type = 'test.thing.begin'
timestamp = datetime.datetime(2014,7,4,12,7,21,4096)
timestamp = datetime.datetime(2014, 7, 4, 12, 7, 21, 4096)
traits = dict(test_string='foobar',
test_number=42,
test_float=3.1415,
test_date=datetime.datetime(2014,7,1,0,0,0,0),
test_date=datetime.datetime(2014, 7, 1, 0, 0, 0, 0),
somevalue=u'A fine test string')
self.db.create_event(message_id, event_type, timestamp, traits)
event = self.db.get_event_by_message_id(message_id)
@ -196,11 +222,11 @@ class TestDB(unittest.TestCase):
def test_create_event_duplicate(self):
message_id = '9876-0001-0001'
event_type = 'test.thing.begin'
timestamp = datetime.datetime(2014,7,4,12,7,21,4096)
timestamp = datetime.datetime(2014, 7, 4, 12, 7, 21, 4096)
traits = dict(test_string='foobar',
test_number=42,
test_float=3.1415,
test_date=datetime.datetime(2014,7,1,0,0,0,0),
test_date=datetime.datetime(2014, 7, 1, 0, 0, 0, 0),
somevalue=u'A fine test string')
self.db.create_event(message_id, event_type, timestamp, traits)
with self.assertRaises(db.DuplicateError):
@ -211,16 +237,18 @@ class TestDB(unittest.TestCase):
self.assertEqual(len(event), 7)
expected = dict(message_id='1234-5678-001',
event_type='test.thing.begin',
timestamp=datetime.datetime(2014,8,1,10,20,45,453201),
timestamp=datetime.datetime(2014, 8, 1, 10, 20, 45,
453201),
instance_id='aaaa-bbbb-cccc-dddd',
memory_mb=1024,
test_weight=20112.42,
launched_at=datetime.datetime(2014,7,1,2,30,45,453201),)
launched_at=datetime.datetime(2014, 7, 1, 2, 30, 45,
453201), )
self.assertDictContainsSubset(expected, event)
def test_get_stream_events(self):
stream = self.db.get_stream_by_id(1)
events = self.db.get_stream_events(stream)
events = self.db.get_stream_events(stream)
self.assertEqual(len(events), 2)
self.assertIn('1234-5678-003', [e['message_id'] for e in events])
self.assertIn('1234-5678-004', [e['message_id'] for e in events])
@ -228,37 +256,46 @@ class TestDB(unittest.TestCase):
def test_create_stream(self):
event = dict(message_id='1234-5678-001',
event_type='test.thing.begin',
timestamp=datetime.datetime(2014,8,1,10,20,45,453201),
timestamp=datetime.datetime(2014, 8, 1, 10, 20, 45,
453201),
instance_id='aaaa-bbbb-cccc-dddd',
memory_mb=1024,
test_weight=20112.42,
launched_at=datetime.datetime(2014,7,1,2,30,45,453201),)
timestamp = timex.TimeRange(datetime.datetime(2014,8,1,0,0,0,27),
datetime.datetime(2014,2,2,0,0,0,27))
launched_at=datetime.datetime(2014, 7, 1, 2, 30, 45,
453201), )
timestamp = timex.TimeRange(datetime.datetime(2014, 8, 1, 0, 0, 0, 27),
datetime.datetime(2014, 2, 2, 0, 0, 0, 27))
dist_traits = dict(timestamp=timestamp,
instance_id='aaaa-bbbb-cccc-dddd')
class MockTimestamp(object):
pass
mock_expire_value = datetime.datetime(2014,8,2,12,12,12,12)
mock_expire_value = datetime.datetime(2014, 8, 2, 12, 12, 12, 12)
def mock_time_expr(first, last):
self.assertEqual(first, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(last, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(first,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
self.assertEqual(last,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
t = MockTimestamp()
t.timestamp = mock_expire_value
return t
stream = self.db.create_stream('test_create_stream', event, dist_traits, mock_time_expr)
stream = self.db.create_stream('test_create_stream', event,
dist_traits,
mock_time_expr)
self.assertEqual(stream.name, 'test_create_stream')
self.assertEqual(stream.first_event, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(stream.last_event, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(stream.first_event,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
self.assertEqual(stream.last_event,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
self.assertEqual(stream.expire_timestamp, mock_expire_value)
self.assertIsNone(stream.fire_timestamp)
self.assertEqual(stream.state, models.StreamState.active)
self.assertEqual(stream.state_serial_no, 0)
self.assertTrue(self.db.stream_has_dist_trait(stream.id, 'timestamp', timestamp))
self.assertTrue(
self.db.stream_has_dist_trait(stream.id, 'timestamp', timestamp))
self.assertTrue(self.db.stream_has_dist_trait(stream.id,
'instance_id',
'aaaa-bbbb-cccc-dddd'))
@ -270,28 +307,34 @@ class TestDB(unittest.TestCase):
stream = self.db.get_stream_by_id(1)
event = dict(message_id='1234-5678-001',
event_type='test.thing.begin',
timestamp=datetime.datetime(2014,8,1,10,20,45,453201),
timestamp=datetime.datetime(2014, 8, 1, 10, 20, 45,
453201),
instance_id='aaaa-bbbb-cccc-dddd',
memory_mb=1024,
test_weight=20112.42,
launched_at=datetime.datetime(2014,7,1,2,30,45,453201),)
launched_at=datetime.datetime(2014, 7, 1, 2, 30, 45,
453201), )
class MockTimestamp(object):
pass
mock_expire_value = datetime.datetime(2014,8,2,12,12,12,12)
mock_expire_value = datetime.datetime(2014, 8, 2, 12, 12, 12, 12)
def mock_time_expr(first, last):
self.assertEqual(first, datetime.datetime(2014,8,1,2,10,12,0))
self.assertEqual(last, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(first,
datetime.datetime(2014, 8, 1, 2, 10, 12, 0))
self.assertEqual(last,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
t = MockTimestamp()
t.timestamp = mock_expire_value
return t
self.db.add_event_stream(stream, event, mock_time_expr)
self.assertEqual(stream.expire_timestamp, mock_expire_value)
self.assertEqual(stream.first_event, datetime.datetime(2014,8,1,2,10,12,0))
self.assertEqual(stream.last_event, datetime.datetime(2014,8,1,10,20,45,453201))
self.assertEqual(stream.first_event,
datetime.datetime(2014, 8, 1, 2, 10, 12, 0))
self.assertEqual(stream.last_event,
datetime.datetime(2014, 8, 1, 10, 20, 45, 453201))
events = self.db.get_stream_events(stream)
self.assertEqual(len(events), 3)
self.assertIn('1234-5678-001', [e['message_id'] for e in events])
@ -313,17 +356,21 @@ class TestDB(unittest.TestCase):
self.assertEqual(dist_traits['test_weight'], 3.1415)
self.assertEqual(type(dist_traits['test_weight']), float)
self.assertIn('launched_at', dist_traits)
self.assertEqual(dist_traits['launched_at'], datetime.datetime(2014,7,8,9,40,50,77777))
self.assertEqual(dist_traits['launched_at'],
datetime.datetime(2014, 7, 8, 9, 40, 50, 77777))
self.assertEqual(type(dist_traits['launched_at']), datetime.datetime)
self.assertIn('timestamp', dist_traits)
timestamp = dist_traits['timestamp']
self.assertEqual(type(timestamp), timex.TimeRange)
self.assertEqual(timestamp.begin, datetime.datetime(2014,7,8,0,0,0,27))
self.assertEqual(timestamp.end, datetime.datetime(2014,7,9,0,0,0,27))
self.assertEqual(timestamp.begin,
datetime.datetime(2014, 7, 8, 0, 0, 0, 27))
self.assertEqual(timestamp.end,
datetime.datetime(2014, 7, 9, 0, 0, 0, 27))
def test_stream_has_dist_trait(self):
#this mostly tests that the polymorphic trait comparisons are working.
dt = self.db.stream_has_dist_trait(1, 'instance_id', 'zzzz-xxxx-yyyy-wwww')
# this mostly tests that the polymorphic trait comparisons are working.
dt = self.db.stream_has_dist_trait(1, 'instance_id',
'zzzz-xxxx-yyyy-wwww')
self.assertIsNotNone(dt)
self.assertEqual(len(dt), 1)
self.assertIn('instance_id', dt)
@ -341,15 +388,15 @@ class TestDB(unittest.TestCase):
self.assertIn('test_weight', dt)
self.assertEqual(dt['test_weight'], 3.1415)
launched = datetime.datetime(2014,7,8,9,40,50,77777)
launched = datetime.datetime(2014, 7, 8, 9, 40, 50, 77777)
dt = self.db.stream_has_dist_trait(1, 'launched_at', launched)
self.assertIsNotNone(dt)
self.assertEqual(len(dt), 1)
self.assertIn('launched_at', dt)
self.assertEqual(dt['launched_at'], launched)
self.assertEqual(dt['launched_at'], launched)
timestamp = timex.TimeRange(datetime.datetime(2014,7,8,0,0,0,27),
datetime.datetime(2014,7,9,0,0,0,27))
timestamp = timex.TimeRange(datetime.datetime(2014, 7, 8, 0, 0, 0, 27),
datetime.datetime(2014, 7, 9, 0, 0, 0, 27))
dt = self.db.stream_has_dist_trait(1, 'timestamp', timestamp)
self.assertIsNotNone(dt)
self.assertEqual(len(dt), 1)
@ -358,19 +405,22 @@ class TestDB(unittest.TestCase):
self.assertEqual(dt['timestamp'].end, timestamp.end)
def test_get_active_stream(self):
timestamp = timex.TimeRange(datetime.datetime(2014,7,8,0,0,0,27),
datetime.datetime(2014,7,9,0,0,0,27))
timestamp = timex.TimeRange(datetime.datetime(2014, 7, 8, 0, 0, 0, 27),
datetime.datetime(2014, 7, 9, 0, 0, 0, 27))
dist_traits = dict(instance_id='zzzz-xxxx-yyyy-wwww',
memory_mb=4096,
test_weight=3.1415,
launched_at=datetime.datetime(2014,7,8,9,40,50,77777),
launched_at=datetime.datetime(2014, 7, 8, 9, 40, 50,
77777),
timestamp=timestamp)
current_time = datetime.datetime(2014,8,2,1,0,0,02)
stream = self.db.get_active_stream('test_trigger', dist_traits, current_time)
current_time = datetime.datetime(2014, 8, 2, 1, 0, 0, 2)
stream = self.db.get_active_stream('test_trigger', dist_traits,
current_time)
self.assertIsNotNone(stream)
self.assertEqual(stream.id, 1)
current_time = datetime.datetime(2014,8,3,1,0,0,02)
stream = self.db.get_active_stream('test_trigger', dist_traits, current_time)
current_time = datetime.datetime(2014, 8, 3, 1, 0, 0, 2)
stream = self.db.get_active_stream('test_trigger', dist_traits,
current_time)
self.assertIsNone(stream)
def test_stream_ready_to_fire(self):
@ -381,7 +431,7 @@ class TestDB(unittest.TestCase):
self.assertEqual(stream.fire_timestamp, fire_time)
def test_get_ready_streams_fire(self):
current_time = datetime.datetime(2014,8,12,0,0,0,42)
current_time = datetime.datetime(2014, 8, 12, 0, 0, 0, 42)
streams = self.db.get_ready_streams(10, current_time)
self.assertEqual(len(streams), 3)
stream_ids = [stream.id for stream in streams]
@ -389,18 +439,18 @@ class TestDB(unittest.TestCase):
self.assertIn(4, stream_ids)
self.assertIn(7, stream_ids)
current_time = datetime.datetime(2014,8,10,12,0,0,42)
current_time = datetime.datetime(2014, 8, 10, 12, 0, 0, 42)
streams = self.db.get_ready_streams(10, current_time)
self.assertEqual(len(streams), 1)
stream_ids = [stream.id for stream in streams]
self.assertIn(3, stream_ids)
current_time = datetime.datetime(2014,8,12,0,0,0,42)
current_time = datetime.datetime(2014, 8, 12, 0, 0, 0, 42)
streams = self.db.get_ready_streams(1, current_time)
self.assertEqual(len(streams), 1)
def test_get_ready_streams_expire(self):
current_time = datetime.datetime(2014,8,17,0,0,0,42)
current_time = datetime.datetime(2014, 8, 17, 0, 0, 0, 42)
streams = self.db.get_ready_streams(10, current_time, expire=True)
self.assertEqual(len(streams), 5)
stream_ids = [stream.id for stream in streams]
@ -410,21 +460,22 @@ class TestDB(unittest.TestCase):
self.assertIn(4, stream_ids)
self.assertIn(8, stream_ids)
current_time = datetime.datetime(2014,8,10,12,0,0,42)
current_time = datetime.datetime(2014, 8, 10, 12, 0, 0, 42)
streams = self.db.get_ready_streams(10, current_time, expire=True)
self.assertEqual(len(streams), 2)
stream_ids = [stream.id for stream in streams]
self.assertIn(1, stream_ids)
self.assertIn(2, stream_ids)
current_time = datetime.datetime(2014,8,17,0,0,0,42)
current_time = datetime.datetime(2014, 8, 17, 0, 0, 0, 42)
streams = self.db.get_ready_streams(1, current_time, expire=True)
self.assertEqual(len(streams), 1)
def test_set_stream_state_sucess(self):
stream = self.db.get_stream_by_id(1)
old_serial = stream.state_serial_no
new_stream = self.db.set_stream_state(stream, models.StreamState.firing)
new_stream = self.db.set_stream_state(stream,
models.StreamState.firing)
self.assertEqual(new_stream.state, models.StreamState.firing)
self.assertEqual(new_stream.state_serial_no, old_serial + 1)
@ -491,8 +542,8 @@ class TestDB(unittest.TestCase):
self.assertEqual([{'count': 4}], count)
def test_find_events_date_filter(self):
_from = datetime.datetime(2014,8,1,10)
_to = datetime.datetime(2014,8,1,16)
_from = datetime.datetime(2014, 8, 1, 10)
_to = datetime.datetime(2014, 8, 1, 16)
events = self.db.find_events(from_datetime=_from, to_datetime=_to)
self.assertEqual(2, len(events))
msg_ids = [event['message_id'] for event in events]

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import mock
@ -13,8 +29,8 @@ class TestDebugManager(unittest.TestCase):
def test_get_debugger_none(self):
debugger = self.debug_manager.get_debugger(None)
self.assertEquals("n/a", debugger._name)
self.assertEquals(2, debugger._debug_level)
self.assertEqual("n/a", debugger._name)
self.assertEqual(2, debugger._debug_level)
def test_get_debugger_off(self):
tdef = mock.MagicMock(name="tdef")
@ -22,11 +38,11 @@ class TestDebugManager(unittest.TestCase):
tdef.debug_level = 0
debugger = self.debug_manager.get_debugger(tdef)
self.assertTrue(isinstance(debugger, debugging.NoOpDebugger))
self.assertEquals(debugger,
self.debug_manager._debuggers['my_trigger'])
self.assertEqual(debugger,
self.debug_manager._debuggers['my_trigger'])
debugger2 = self.debug_manager.get_debugger(tdef)
self.assertEquals(debugger, debugger2)
self.assertEqual(debugger, debugger2)
def test_get_debugger_on(self):
tdef = mock.MagicMock(name="tdef")
@ -34,11 +50,11 @@ class TestDebugManager(unittest.TestCase):
tdef.debug_level = 1
debugger = self.debug_manager.get_debugger(tdef)
self.assertTrue(isinstance(debugger, debugging.DetailedDebugger))
self.assertEquals(debugger,
self.debug_manager._debuggers['my_trigger'])
self.assertEqual(debugger,
self.debug_manager._debuggers['my_trigger'])
debugger2 = self.debug_manager.get_debugger(tdef)
self.assertEquals(debugger, debugger2)
self.assertEqual(debugger, debugger2)
def test_dump_group_level1(self):
debugger = mock.MagicMock(name="debugger")
@ -52,7 +68,7 @@ class TestDebugManager(unittest.TestCase):
self.debug_manager.dump_group(debugger, "my_group")
log.info.assert_called_once_with(
"my_group Criteria: 3 checks, 1 passed")
"my_group Criteria: 3 checks, 1 passed")
def test_dump_group_level2(self):
debugger = mock.MagicMock(name="debugger")
@ -66,9 +82,10 @@ class TestDebugManager(unittest.TestCase):
with mock.patch.object(debugging, "logger") as log:
self.debug_manager.dump_group(debugger, "my_group")
self.assertEquals(log.info.call_args_list,
[mock.call("my_group Criteria: 3 checks, 1 passed"),
mock.call(" - foo = 12")])
self.assertEqual(log.info.call_args_list,
[mock.call(
"my_group Criteria: 3 checks, 1 passed"),
mock.call(" - foo = 12")])
def test_dump_counters(self):
debugger = mock.MagicMock(name="debugger")
@ -96,49 +113,51 @@ class TestDebugManager(unittest.TestCase):
with mock.patch.object(self.debug_manager, "dump_group") as grp:
with mock.patch.object(debugging, "logger") as log:
self.debug_manager.dump_debuggers()
self.assertEquals(log.info.call_args_list,
[mock.call("---- Trigger Definition: my_debugger ----"),
mock.call("----------------------------")])
self.assertEqual(
log.info.call_args_list,
[mock.call(
"---- Trigger Definition: my_debugger ----"),
mock.call(
"----------------------------")])
grp.assert_called_once_with(debugger, "my_group")
ctr.assert_called_once_with(debugger)
debugger.reset.assert_called_once_with()
class TestDetailedDebugger(unittest.TestCase):
def setUp(self):
super(TestDetailedDebugger, self).setUp()
self.debugger = debugging.DetailedDebugger("my_debugger", 2)
def test_constructor(self):
with mock.patch("winchester.debugging.DetailedDebugger.reset") \
as reset:
d = debugging.DetailedDebugger("my_debugger", 2)
as reset:
debugging.DetailedDebugger("my_debugger", 2)
reset.assert_called_once_with()
self.assertEquals(self.debugger._name, "my_debugger")
self.assertEquals(self.debugger._debug_level, 2)
self.assertEqual(self.debugger._name, "my_debugger")
self.assertEqual(self.debugger._debug_level, 2)
def test_reset(self):
self.assertEquals(self.debugger._groups, {})
self.assertEquals(self.debugger._counters, {})
self.assertEqual(self.debugger._groups, {})
self.assertEqual(self.debugger._counters, {})
def test_get_group(self):
self.assertEquals(self.debugger._groups, {})
self.assertEqual(self.debugger._groups, {})
g = self.debugger.get_group("foo")
self.assertEquals(g._name, "foo")
self.assertEqual(g._name, "foo")
self.assertTrue(self.debugger._groups['foo'])
def test_bump_counter(self):
self.assertEquals(self.debugger._counters, {})
self.assertEqual(self.debugger._counters, {})
self.debugger.bump_counter("foo")
self.assertEquals(self.debugger._counters['foo'], 1)
self.assertEqual(self.debugger._counters['foo'], 1)
self.debugger.bump_counter("foo", 2)
self.assertEquals(self.debugger._counters['foo'], 3)
self.assertEqual(self.debugger._counters['foo'], 3)
def test_get_debug_level(self):
self.assertEquals(self.debugger.get_debug_level(), 2)
self.assertEqual(self.debugger.get_debug_level(), 2)
class TestNoOpDebugger(unittest.TestCase):
@ -151,14 +170,14 @@ class TestNoOpDebugger(unittest.TestCase):
def test_get_group(self):
g = self.debugger.get_group("foo")
self.assertEquals(g, self.debugger.noop_group)
self.assertEqual(g, self.debugger.noop_group)
def test_bump_counter(self):
self.debugger.bump_counter("foo")
self.debugger.bump_counter("foo", 2)
def test_get_debug_level(self):
self.assertEquals(self.debugger.get_debug_level(), 0)
self.assertEqual(self.debugger.get_debug_level(), 0)
class TestGroup(unittest.TestCase):
@ -167,40 +186,40 @@ class TestGroup(unittest.TestCase):
self.group = debugging.Group("my_group")
def test_constructor(self):
self.assertEquals("my_group", self.group._name)
self.assertEquals(0, self.group._match)
self.assertEquals(0, self.group._mismatch)
self.assertEquals({}, self.group._reasons)
self.assertEqual("my_group", self.group._name)
self.assertEqual(0, self.group._match)
self.assertEqual(0, self.group._mismatch)
self.assertEqual({}, self.group._reasons)
def test_match(self):
self.assertTrue(self.group.match())
self.assertEquals(1, self.group._match)
self.assertEqual(1, self.group._match)
def test_mismatch(self):
self.assertFalse(self.group.mismatch("reason"))
self.assertEquals(1, self.group._mismatch)
self.assertEquals(1, self.group._reasons['reason'])
self.assertEqual(1, self.group._mismatch)
self.assertEqual(1, self.group._reasons['reason'])
def test_check(self):
self.assertTrue(self.group.check(True, "reason"))
self.assertEquals(1, self.group._match)
self.assertEquals(0, self.group._mismatch)
self.assertEquals({}, self.group._reasons)
self.assertEqual(1, self.group._match)
self.assertEqual(0, self.group._mismatch)
self.assertEqual({}, self.group._reasons)
self.assertTrue(self.group.check(True, "reason"))
self.assertEquals(2, self.group._match)
self.assertEquals(0, self.group._mismatch)
self.assertEquals({}, self.group._reasons)
self.assertEqual(2, self.group._match)
self.assertEqual(0, self.group._mismatch)
self.assertEqual({}, self.group._reasons)
self.assertFalse(self.group.check(False, "reason"))
self.assertEquals(2, self.group._match)
self.assertEquals(1, self.group._mismatch)
self.assertEquals(1, self.group._reasons['reason'])
self.assertEqual(2, self.group._match)
self.assertEqual(1, self.group._mismatch)
self.assertEqual(1, self.group._reasons['reason'])
self.assertFalse(self.group.check(False, "reason"))
self.assertEquals(2, self.group._match)
self.assertEquals(2, self.group._mismatch)
self.assertEquals(2, self.group._reasons['reason'])
self.assertEqual(2, self.group._match)
self.assertEqual(2, self.group._mismatch)
self.assertEqual(2, self.group._reasons['reason'])
class TestNoOpGroup(unittest.TestCase):

View File

@ -1,7 +1,21 @@
#for Python2.6 compatability.
import unittest2 as unittest
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
# for Python2.6 compatability.
import unittest2 as unittest
import datetime
import timex
@ -62,19 +76,19 @@ class TestCriterion(unittest.TestCase):
def test_time_criterion(self):
c = definition.TimeCriterion("day", "foo")
e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2),
foo=datetime.datetime(2014,8,1,1,2,0,0))
e = dict(timestamp=datetime.datetime(2014, 8, 1, 7, 52, 31, 2),
foo=datetime.datetime(2014, 8, 1, 1, 2, 0, 0))
self.assertTrue(c.match(e, self.fake_group))
e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2),
foo=datetime.datetime(2014,8,2,1,2,0,0))
e = dict(timestamp=datetime.datetime(2014, 8, 1, 7, 52, 31, 2),
foo=datetime.datetime(2014, 8, 2, 1, 2, 0, 0))
self.assertFalse(c.match(e, self.fake_group))
e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2),
bar=datetime.datetime(2014,8,1,1,2,0,0))
e = dict(timestamp=datetime.datetime(2014, 8, 1, 7, 52, 31, 2),
bar=datetime.datetime(2014, 8, 1, 1, 2, 0, 0))
self.assertFalse(c.match(e, self.fake_group))
e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2),
e = dict(timestamp=datetime.datetime(2014, 8, 1, 7, 52, 31, 2),
message_id='1234-5678',
quux=4,
foo=datetime.datetime(2014,8,1,1,2,0,0))
foo=datetime.datetime(2014, 8, 1, 1, 2, 0, 0))
self.assertTrue(c.match(e, self.fake_group))
@ -129,9 +143,9 @@ class TestCriteria(unittest.TestCase):
def test_match_for_type(self):
config = dict(event_type=["test.foo.*", "!test.wakka.*"])
criteria = definition.Criteria(config)
event1 = dict(event_type = "test.foo.zazz")
event2 = dict(event_type = "test.wakka.zazz")
event3 = dict(event_type = "test.boingy")
event1 = dict(event_type="test.foo.zazz")
event2 = dict(event_type="test.wakka.zazz")
event3 = dict(event_type="test.boingy")
self.assertTrue(criteria.match(event1, self.fake_group))
self.assertFalse(criteria.match(event2, self.fake_group))
self.assertFalse(criteria.match(event3, self.fake_group))
@ -140,13 +154,13 @@ class TestCriteria(unittest.TestCase):
config = dict(timestamp='day($launched_at)')
criteria = definition.Criteria(config)
event1 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4))
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4))
event2 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,2,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4))
timestamp=datetime.datetime(2014, 8, 2, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4))
event3 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,2,17,16,15,14))
timestamp=datetime.datetime(2014, 8, 2, 17, 16, 15, 14))
self.assertTrue(criteria.match(event1, self.fake_group))
self.assertFalse(criteria.match(event2, self.fake_group))
self.assertFalse(criteria.match(event3, self.fake_group))
@ -159,42 +173,42 @@ class TestCriteria(unittest.TestCase):
other_trait={'string': 'text here'}))
criteria = definition.Criteria(config)
event1 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4),
some_trait='test',
other_trait='text here',
memory_mb=4096,
test_weight=3.1415)
event2 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4),
some_trait='foo',
other_trait='text here',
memory_mb=4096,
test_weight=3.1415)
event3 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4),
other_trait='text here',
memory_mb=4096,
test_weight=3.1415)
event4 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,2,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 2, 1, 2, 3, 4),
some_trait='test',
other_trait='text here',
memory_mb=4096,
test_weight=3.1415)
event5 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4),
some_trait='test',
other_trait='text here',
memory_mb=1024,
test_weight=3.1415)
event6 = dict(event_type='test.thing',
timestamp=datetime.datetime(2014,8,1,17,16,15,14),
launched_at=datetime.datetime(2014,8,1,1,2,3,4),
timestamp=datetime.datetime(2014, 8, 1, 17, 16, 15, 14),
launched_at=datetime.datetime(2014, 8, 1, 1, 2, 3, 4),
some_trait='test',
other_trait='text here',
memory_mb=4096,
@ -208,7 +222,6 @@ class TestCriteria(unittest.TestCase):
class TestTriggerDefinition(unittest.TestCase):
def setUp(self):
super(TestTriggerDefinition, self).setUp()
self.debug_manager = debugging.DebugManager()
@ -221,12 +234,12 @@ class TestTriggerDefinition(unittest.TestCase):
self.debug_manager)
with self.assertRaises(definition.DefinitionError):
definition.TriggerDefinition(dict(name='test_trigger',
expiration='$last + 1d'),
expiration='$last + 1d'),
self.debug_manager)
with self.assertRaises(definition.DefinitionError):
definition.TriggerDefinition(dict(name='test_trigger',
expiration='$last + 1d',
fire_pipeline='test_pipeline'),
expiration='$last + 1d',
fire_pipeline='test_pipeline'),
self.debug_manager)
with self.assertRaises(definition.DefinitionError):
definition.TriggerDefinition(
@ -236,12 +249,12 @@ class TestTriggerDefinition(unittest.TestCase):
fire_criteria=[dict(event_type='test.thing')]),
self.debug_manager)
tdef = definition.TriggerDefinition(
dict(name='test_trigger',
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')]),
self.debug_manager)
dict(name='test_trigger',
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')]),
self.debug_manager)
self.assertEqual(len(tdef.distinguished_by), 0)
self.assertEqual(len(tdef.fire_criteria), 1)
self.assertIsInstance(tdef.fire_criteria[0], definition.Criteria)
@ -252,32 +265,32 @@ class TestTriggerDefinition(unittest.TestCase):
def test_match_for_criteria(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
event1 = dict(event_type='test.thing')
event2 = dict(event_type='other.thing')
self.assertTrue(tdef.match(event1))
self.assertFalse(tdef.match(event2))
config = dict(name='test_trigger',
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*'),
dict(event_type='other.*')])
expiration='$last + 1d',
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*'),
dict(event_type='other.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
self.assertTrue(tdef.match(event1))
self.assertTrue(tdef.match(event2))
def test_match_for_distinguished_traits(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
event1 = dict(event_type='test.thing', instance_id='foo')
event2 = dict(event_type='test.thing')
@ -286,11 +299,11 @@ class TestTriggerDefinition(unittest.TestCase):
def test_get_distinguished_traits(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
event1 = dict(event_type='test.thing', instance_id='foo')
tdef = definition.TriggerDefinition(config, self.debug_manager)
mcriteria = tdef.match(event1)
@ -301,21 +314,21 @@ class TestTriggerDefinition(unittest.TestCase):
def test_get_distinguished_traits_with_timeexpression(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id', dict(timestamp='day')],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id', dict(timestamp='day')],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
event1 = dict(event_type='test.thing', instance_id='foo',
timestamp=datetime.datetime(2014,8,1,20,4,23,444))
timestamp=datetime.datetime(2014, 8, 1, 20, 4, 23, 444))
tdef = definition.TriggerDefinition(config, self.debug_manager)
mcriteria = tdef.match(event1)
dt = tdef.get_distinguishing_traits(event1, mcriteria)
self.assertEqual(len(dt), 2)
self.assertIn('instance_id', dt)
self.assertEqual(dt['instance_id'], 'foo')
timerange = timex.TimeRange(datetime.datetime(2014,8,1,0,0,0,0),
datetime.datetime(2014,8,2,0,0,0,0))
timerange = timex.TimeRange(datetime.datetime(2014, 8, 1, 0, 0, 0, 0),
datetime.datetime(2014, 8, 2, 0, 0, 0, 0))
self.assertIn('timestamp', dt)
self.assertIsInstance(dt['timestamp'], timex.TimeRange)
self.assertEqual(dt['timestamp'].begin, timerange.begin)
@ -323,12 +336,13 @@ class TestTriggerDefinition(unittest.TestCase):
def test_get_distinguished_traits_with_map(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*',
map_distinguished_by=dict(instance_id='other_id'))])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*',
map_distinguished_by=dict(
instance_id='other_id'))])
event1 = dict(event_type='test.thing', instance_id='foo',
other_id='bar')
tdef = definition.TriggerDefinition(config, self.debug_manager)
@ -340,51 +354,51 @@ class TestTriggerDefinition(unittest.TestCase):
def test_get_fire_timestamp(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
test_time = datetime.datetime(2014,8,1,20,4,23,444)
test_time_plus_1hr = datetime.datetime(2014,8,1,21,4,23,444)
test_time = datetime.datetime(2014, 8, 1, 20, 4, 23, 444)
test_time_plus_1hr = datetime.datetime(2014, 8, 1, 21, 4, 23, 444)
ft = tdef.get_fire_timestamp(test_time)
self.assertEqual(ft, test_time)
config = dict(name='test_trigger',
expiration='$last + 1d',
fire_delay=3600,
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
fire_delay=3600,
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
ft = tdef.get_fire_timestamp(test_time)
self.assertEqual(ft, test_time_plus_1hr)
def test_should_fire(self):
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing')],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
events1 = [ dict(event_type='test.foobar'),
dict(event_type='test.thing'),
dict(event_type='test.thing')]
events2 = [ dict(event_type='test.foobar'),
dict(event_type='test.thing')]
events3 = [ dict(event_type='test.foobar'),
dict(event_type='test.whatsit')]
events1 = [dict(event_type='test.foobar'),
dict(event_type='test.thing'),
dict(event_type='test.thing')]
events2 = [dict(event_type='test.foobar'),
dict(event_type='test.thing')]
events3 = [dict(event_type='test.foobar'),
dict(event_type='test.whatsit')]
self.assertTrue(tdef.should_fire(events1))
self.assertTrue(tdef.should_fire(events2))
self.assertFalse(tdef.should_fire(events3))
config = dict(name='test_trigger',
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing', number=2)],
match_criteria=[dict(event_type='test.*')])
expiration='$last + 1d',
distinguished_by=['instance_id'],
fire_pipeline='test_pipeline',
fire_criteria=[dict(event_type='test.thing', number=2)],
match_criteria=[dict(event_type='test.*')])
tdef = definition.TriggerDefinition(config, self.debug_manager)
self.assertTrue(tdef.should_fire(events1))
self.assertFalse(tdef.should_fire(events2))

View File

@ -1,6 +1,21 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import datetime
import mock
from winchester import pipeline_handler
@ -17,26 +32,25 @@ class TestConnectionManager(unittest.TestCase):
cd, ct, ed, et = self.mgr._extract_params({'exchange': 'my_exchange'})
self.assertEquals(cd, {'host': 'localhost',
'port': 5672,
'user': 'guest',
'password': 'guest',
'library': 'librabbitmq',
'vhost': '/'})
self.assertEqual(cd, {'host': 'localhost',
'port': 5672,
'user': 'guest',
'password': 'guest',
'library': 'librabbitmq',
'vhost': '/'})
self.assertEquals(ct, (('host', 'localhost'),
('library', 'librabbitmq'),
('password', 'guest'),
('port', 5672),
('user', 'guest'),
('vhost', '/')))
self.assertEqual(ct, (('host', 'localhost'),
('library', 'librabbitmq'),
('password', 'guest'),
('port', 5672),
('user', 'guest'),
('vhost', '/')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'topic'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'topic')))
self.assertEqual(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'topic'})
self.assertEqual(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'topic')))
kw = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib',
@ -44,26 +58,25 @@ class TestConnectionManager(unittest.TestCase):
cd, ct, ed, et = self.mgr._extract_params(kw)
self.assertEquals(cd, {'host': 'my_host',
'port': 123,
'user': 'my_user',
'password': 'pwd',
'library': 'my_lib',
'vhost': 'virtual'})
self.assertEqual(cd, {'host': 'my_host',
'port': 123,
'user': 'my_user',
'password': 'pwd',
'library': 'my_lib',
'vhost': 'virtual'})
self.assertEquals(ct, (('host', 'my_host'),
('library', 'my_lib'),
('password', 'pwd'),
('port', 123),
('user', 'my_user'),
('vhost', 'virtual')))
self.assertEqual(ct, (('host', 'my_host'),
('library', 'my_lib'),
('password', 'pwd'),
('port', 123),
('user', 'my_user'),
('vhost', 'virtual')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'foo'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'foo')))
self.assertEqual(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'foo'})
self.assertEqual(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'foo')))
@mock.patch.object(pipeline_handler.ConnectionManager, '_extract_params')
@mock.patch.object(pipeline_handler.driver, 'create_connection')
@ -71,7 +84,7 @@ class TestConnectionManager(unittest.TestCase):
@mock.patch.object(pipeline_handler.driver, 'create_queue')
def test_get_connection(self, cq, ce, cc, ep):
conn = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib'}
'port': 123, 'vhost': 'virtual', 'library': 'my_lib'}
conn_set = tuple(sorted(conn.items()))
exchange = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange_set = tuple(sorted(exchange.items()))
@ -89,17 +102,17 @@ class TestConnectionManager(unittest.TestCase):
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertEquals(1, queue.declare.call_count)
self.assertEqual(final_connection, connection)
self.assertEqual(final_exchange, mexchange)
self.assertEqual(1, queue.declare.call_count)
# Calling again should give the same results ...
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertEqual(final_connection, connection)
self.assertEqual(final_exchange, mexchange)
self.assertTrue(queue.declare.called)
self.assertEquals(1, queue.declare.call_count)
self.assertEqual(1, queue.declare.call_count)
# Change the exchange, and we should have same connection, but new
# exchange object.
@ -112,16 +125,16 @@ class TestConnectionManager(unittest.TestCase):
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange2)
self.assertEquals(2, queue.declare.call_count)
self.assertEqual(final_connection, connection)
self.assertEqual(final_exchange, mexchange2)
self.assertEqual(2, queue.declare.call_count)
# Change the connection, and we should have a new connection and new
# exchange object.
conn2 = {'host': 'my_host2', 'user': 'my_user2', 'password': 'pwd2',
'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'}
'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'}
conn2_set = tuple(sorted(conn2.items()))
exchange3= {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange3 = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange3_set = tuple(sorted(exchange3.items()))
ep.return_value = (conn2, conn2_set, exchange3, exchange3_set)
@ -135,9 +148,9 @@ class TestConnectionManager(unittest.TestCase):
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection2)
self.assertEquals(final_exchange, mexchange3)
self.assertEquals(3, queue.declare.call_count)
self.assertEqual(final_connection, connection2)
self.assertEqual(final_exchange, mexchange3)
self.assertEqual(3, queue.declare.call_count)
class TestException(Exception):
@ -147,7 +160,7 @@ class TestException(Exception):
class TestNotabeneHandler(unittest.TestCase):
def test_constructor_no_queue(self):
with self.assertRaises(pipeline_handler.NotabeneException) as e:
with self.assertRaises(pipeline_handler.NotabeneException):
pipeline_handler.NotabeneHandler()
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
@ -157,7 +170,7 @@ class TestNotabeneHandler(unittest.TestCase):
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, [])
self.assertEqual(h.env_keys, [])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_constructor_env_keys(self, cm):
@ -166,7 +179,7 @@ class TestNotabeneHandler(unittest.TestCase):
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, ['x', 'y'])
self.assertEqual(h.env_keys, ['x', 'y'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_handle_events(self, cm):
@ -176,8 +189,8 @@ class TestNotabeneHandler(unittest.TestCase):
events = range(5)
env = {'x': ['cat', 'dog'], 'y': ['fish']}
ret = h.handle_events(events, env)
self.assertEquals(ret, events)
self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish'])
self.assertEqual(ret, events)
self.assertEqual(h.pending_notifications, ['cat', 'dog', 'fish'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit_good(self, cm):
@ -188,9 +201,9 @@ class TestNotabeneHandler(unittest.TestCase):
h.pending_notifications = [{'event_type': 'event1'},
{'event_type': 'event2'}]
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
'send_notification') as sn:
h.commit()
self.assertEquals(sn.call_count, 2)
self.assertEqual(sn.call_count, 2)
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit(self, cm):
@ -201,10 +214,10 @@ class TestNotabeneHandler(unittest.TestCase):
h.pending_notifications = [{'event_type': 'event1'},
{'event_type': 'event2'}]
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
'send_notification') as sn:
sn.side_effect = TestException
with mock.patch.object(pipeline_handler.logger,
'exception') as ex:
'exception') as ex:
h.commit()
self.assertEquals(ex.call_count, 2)
self.assertEquals(sn.call_count, 2)
self.assertEqual(ex.call_count, 2)
self.assertEqual(sn.call_count, 2)

View File

@ -1,14 +1,27 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import mock
import datetime
import timex
from winchester import debugging
from winchester import db as winch_db
from winchester import pipeline_manager
from winchester import debugging
from winchester.models import StreamState
from winchester import pipeline_manager
class TestPipeline(unittest.TestCase):
@ -19,9 +32,9 @@ class TestPipeline(unittest.TestCase):
self.fake_stream.id = "stream-1234"
def test_check_handler_config(self):
handler_map = {'test_thing': "blah"}
c = pipeline_manager.Pipeline.check_handler_config("test_thing", handler_map)
c = pipeline_manager.Pipeline.check_handler_config("test_thing",
handler_map)
self.assertIsInstance(c, dict)
self.assertIn('name', c)
self.assertIn('params', c)
@ -48,16 +61,17 @@ class TestPipeline(unittest.TestCase):
self.assertEqual(c['params'], {'book': 42})
with self.assertRaises(pipeline_manager.PipelineConfigError):
c = pipeline_manager.Pipeline.check_handler_config("other_thing", handler_map)
pipeline_manager.Pipeline.check_handler_config("other_thing",
handler_map)
with self.assertRaises(pipeline_manager.PipelineConfigError):
conf = dict(params={'book': 42})
c = pipeline_manager.Pipeline.check_handler_config(conf, handler_map)
pipeline_manager.Pipeline.check_handler_config(conf, handler_map)
def test_init(self):
conf = [dict(name='test_thing', params={'book': 42})]
handler_class = mock.MagicMock()
handler_map = {'test_thing': handler_class}
handler_map = {'test_thing': handler_class}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
self.assertEqual(p.name, "test_pipeline")
self.assertEqual(len(p.handlers), 1)
@ -75,21 +89,25 @@ class TestPipeline(unittest.TestCase):
handler_class1 = mock.MagicMock(name='handler1')
handler_class2 = mock.MagicMock(name='handler2')
handler_class3 = mock.MagicMock(name='handler3')
handler_class3.return_value.handle_events.return_value = test_events + new_events
handler_class3.return_value.handle_events.return_value = (
test_events + new_events)
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.commit = mock.MagicMock(name='commit')
p.rollback = mock.MagicMock(name='rollback')
ret = p.handle_events(test_events, self.fake_stream, self.debugger)
handler_class1.return_value.handle_events.assert_called_once_with(test_events, p.env)
handler_class1.return_value.handle_events.assert_called_once_with(
test_events, p.env)
events1 = handler_class1.return_value.handle_events.return_value
handler_class2.return_value.handle_events.assert_called_once_with(events1, p.env)
handler_class2.return_value.handle_events.assert_called_once_with(
events1, p.env)
events2 = handler_class2.return_value.handle_events.return_value
handler_class3.return_value.handle_events.assert_called_once_with(events2, p.env)
handler_class3.return_value.handle_events.assert_called_once_with(
events2, p.env)
p.commit.assert_called_once_with(self.debugger)
self.assertFalse(p.rollback.called)
self.assertEqual(ret, new_events)
@ -108,11 +126,12 @@ class TestPipeline(unittest.TestCase):
class WhackyError(Exception):
pass
handler_class2.return_value.handle_events.side_effect = WhackyError("whoa!")
handler_class2.return_value.handle_events.side_effect = WhackyError(
"whoa!")
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.commit = mock.MagicMock(name='commit')
p.rollback = mock.MagicMock(name='rollback')
@ -130,9 +149,9 @@ class TestPipeline(unittest.TestCase):
handler_class2 = mock.MagicMock(name='handler2')
handler_class3 = mock.MagicMock(name='handler3')
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.commit(self.debugger)
handler_class1.return_value.commit.assert_called_once_with()
@ -152,9 +171,9 @@ class TestPipeline(unittest.TestCase):
handler_class2.return_value.commit.side_effect = WhackyError("whoa!")
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.commit(self.debugger)
handler_class1.return_value.commit.assert_called_once_with()
@ -169,9 +188,9 @@ class TestPipeline(unittest.TestCase):
handler_class2 = mock.MagicMock(name='handler2')
handler_class3 = mock.MagicMock(name='handler3')
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.rollback(self.debugger)
handler_class1.return_value.rollback.assert_called_once_with()
@ -191,9 +210,9 @@ class TestPipeline(unittest.TestCase):
handler_class2.return_value.rollback.side_effect = WhackyError("whoa!")
handler_map = {'test_thing': handler_class1,
handler_map = {'test_thing': handler_class1,
'other_thing': handler_class2,
'some_thing': handler_class3}
'some_thing': handler_class3}
p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map)
p.rollback(self.debugger)
handler_class1.return_value.rollback.assert_called_once_with()
@ -202,7 +221,6 @@ class TestPipeline(unittest.TestCase):
class TestPipelineManager(unittest.TestCase):
def setUp(self):
super(TestPipelineManager, self).setUp()
self.debugger = debugging.NoOpDebugger()
@ -214,7 +232,8 @@ class TestPipelineManager(unittest.TestCase):
pm.purge_completed_streams = False
stream = "test stream"
pm._complete_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.completed)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.completed)
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
def test_complete_stream_purge(self, mock_config_wrap):
@ -231,7 +250,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db = mock.MagicMock(spec=pm.db)
stream = "test stream"
pm._error_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.error)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.error)
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
def test_expire_error_stream(self, mock_config_wrap):
@ -239,7 +259,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db = mock.MagicMock(spec=pm.db)
stream = "test stream"
pm._expire_error_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.expire_error)
pm.db.set_stream_state.assert_called_once_with(
stream, StreamState.expire_error)
@mock.patch('winchester.pipeline_manager.Pipeline', autospec=True)
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
@ -283,7 +304,7 @@ class TestPipelineManager(unittest.TestCase):
pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers')
pipeline = mock_pipeline.return_value
pipeline.handle_events.side_effect = \
pipeline_manager.PipelineExecutionError('test', 'thing')
pipeline_manager.PipelineExecutionError('test', 'thing')
ret = pm._run_pipeline(stream, trigger_def, pipeline_name,
pipeline_config)
@ -318,7 +339,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.firing)
pm._run_pipeline.assert_called_once_with(stream, trigger_def,
'test_fire_pipeline', pipeline_config)
'test_fire_pipeline',
pipeline_config)
self.assertFalse(pm._error_stream.called)
pm._complete_stream.assert_called_once_with(stream)
self.assertTrue(ret)
@ -341,7 +363,8 @@ class TestPipelineManager(unittest.TestCase):
pm._run_pipeline.return_value = True
ret = pm.fire_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.firing)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.firing)
self.assertFalse(pm._run_pipeline.called)
self.assertFalse(pm._error_stream.called)
self.assertFalse(pm._complete_stream.called)
@ -364,7 +387,7 @@ class TestPipelineManager(unittest.TestCase):
ret = pm.fire_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.firing)
StreamState.firing)
self.assertFalse(pm._error_stream.called)
self.assertFalse(pm._run_pipeline.called)
pm._complete_stream.assert_called_once_with(stream)
@ -391,8 +414,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.firing)
pm._run_pipeline.assert_called_once_with(stream, trigger_def,
'test_fire_pipeline',
pipeline_config)
'test_fire_pipeline',
pipeline_config)
self.assertFalse(pm._complete_stream.called)
pm._error_stream.assert_called_once_with(stream)
self.assertFalse(ret)
@ -418,7 +441,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.expiring)
pm._run_pipeline.assert_called_once_with(stream, trigger_def,
'test_fire_pipeline', pipeline_config)
'test_fire_pipeline',
pipeline_config)
self.assertFalse(pm._error_stream.called)
pm._complete_stream.assert_called_once_with(stream)
self.assertTrue(ret)
@ -441,7 +465,8 @@ class TestPipelineManager(unittest.TestCase):
pm._run_pipeline.return_value = True
ret = pm.expire_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.expiring)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.expiring)
self.assertFalse(pm._run_pipeline.called)
self.assertFalse(pm._expire_error_stream.called)
self.assertFalse(pm._complete_stream.called)
@ -464,7 +489,7 @@ class TestPipelineManager(unittest.TestCase):
ret = pm.expire_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.expiring)
StreamState.expiring)
self.assertFalse(pm._expire_error_stream.called)
self.assertFalse(pm._run_pipeline.called)
pm._complete_stream.assert_called_once_with(stream)
@ -491,7 +516,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.set_stream_state.assert_called_once_with(stream,
StreamState.expiring)
pm._run_pipeline.assert_called_once_with(stream, trigger_def,
'test_fire_pipeline', pipeline_config)
'test_fire_pipeline',
pipeline_config)
self.assertFalse(pm._complete_stream.called)
pm._expire_error_stream.assert_called_once_with(stream)
self.assertFalse(ret)
@ -510,8 +536,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.get_ready_streams.return_value = [stream]
ret = pm.process_ready_streams(42)
pm.db.get_ready_streams.assert_called_once_with(42,
pm.current_time.return_value, expire=False)
pm.db.get_ready_streams.assert_called_once_with(
42, pm.current_time.return_value, expire=False)
pm.fire_stream.assert_called_once_with(stream)
self.assertFalse(pm.expire_stream.called)
self.assertEqual(ret, 1)
@ -528,8 +554,8 @@ class TestPipelineManager(unittest.TestCase):
pm.db.get_ready_streams.return_value = [stream]
ret = pm.process_ready_streams(42, expire=True)
pm.db.get_ready_streams.assert_called_once_with(42,
pm.current_time.return_value, expire=True)
pm.db.get_ready_streams.assert_called_once_with(
42, pm.current_time.return_value, expire=True)
pm.expire_stream.assert_called_once_with(stream)
self.assertFalse(pm.fire_stream.called)
self.assertEqual(ret, 1)
@ -543,4 +569,3 @@ class TestPipelineManager(unittest.TestCase):
self.assertEqual(pm.safe_get_debugger(tdef), self.debugger)
self.assertEqual(pm.safe_get_debugger(None)._name, "n/a")

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import datetime
@ -31,7 +47,7 @@ class TestTimeSyncNoEndpoint(unittest.TestCase):
def test_publish(self):
with mock.patch.object(time_sync.dateutil.parser, "parse") as p:
self.time_sync.publish("foo")
self.assertEquals(0, p.call_count)
self.assertEqual(0, p.call_count)
class BlowUp(Exception):
@ -42,14 +58,14 @@ class TestTimeSyncEndpointPublisher(unittest.TestCase):
def setUp(self):
super(TestTimeSyncEndpointPublisher, self).setUp()
self.time_sync = time_sync.TimeSync(
{"time_sync_endpoint":"example.com"}, publishes=True)
{"time_sync_endpoint": "example.com"}, publishes=True)
def test_fetch_good(self):
with mock.patch.object(time_sync.requests, "get") as r:
response = mock.MagicMock()
response.text = "now"
r.return_value = response
self.assertEquals("now", self.time_sync._fetch())
self.assertEqual("now", self.time_sync._fetch())
def test_fetch_empty(self):
with mock.patch.object(time_sync.time, "sleep") as t:
@ -74,8 +90,8 @@ class TestTimeSyncEndpointPublisher(unittest.TestCase):
def test_current_time(self):
self.time_sync.last_tyme = "now"
with mock.patch.object(self.time_sync, "_should_update") as u:
self.assertEquals("now", self.time_sync.current_time())
self.assertEquals(0, u.call_count)
self.assertEqual("now", self.time_sync.current_time())
self.assertEqual(0, u.call_count)
def test_publish(self):
with mock.patch.object(time_sync.dateutil.parser, "parse") as p:
@ -100,25 +116,25 @@ class TestTimeSyncEndpointPublisher(unittest.TestCase):
r.side_effect = BlowUp
with mock.patch.object(time_sync.logger, "exception") as e:
self.time_sync.publish("string datetime")
self.assertEquals(1, e.call_count)
self.assertEqual(1, e.call_count)
class TestTimeSyncEndpointConsumer(unittest.TestCase):
def setUp(self):
super(TestTimeSyncEndpointConsumer, self).setUp()
self.time_sync = time_sync.TimeSync(
{"time_sync_endpoint":"example.com"})
{"time_sync_endpoint": "example.com"})
def test_current_time(self):
with mock.patch.object(self.time_sync, "_should_update") as u:
u.return_value = True
with mock.patch.object(time_sync.dateutil.parser, "parse") as p:
with mock.patch.object(self.time_sync, "_should_update") as u:
u.return_value = True
with mock.patch.object(time_sync.dateutil.parser, "parse") as p:
p.return_value = "datetime object"
with mock.patch.object(self.time_sync, "_fetch") as r:
r.return_value = "string datetime"
self.assertEquals(self.time_sync.current_time(),
"datetime object")
self.assertEqual(self.time_sync.current_time(),
"datetime object")
def test_current_time_fails(self):
self.time_sync.last_tyme = "no change"
@ -127,6 +143,6 @@ class TestTimeSyncEndpointConsumer(unittest.TestCase):
with mock.patch.object(self.time_sync, "_fetch") as r:
r.side_effect = BlowUp
with mock.patch.object(time_sync.logger, "exception") as e:
self.assertEquals(self.time_sync.current_time(),
"no change")
self.assertEquals(1, e.call_count)
self.assertEqual(self.time_sync.current_time(),
"no change")
self.assertEqual(1, e.call_count)

View File

@ -1,13 +1,27 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import mock
import datetime
import timex
from winchester import db as winch_db
from winchester import debugging
from winchester import definition
from winchester import trigger_manager
@ -22,13 +36,14 @@ class TestTriggerManager(unittest.TestCase):
tm = trigger_manager.TriggerManager('test')
tm.db = mock.MagicMock(spec=tm.db)
event = dict(message_id='1234-test-5678',
timestamp=datetime.datetime(2014,8,1,10,9,8,77777),
timestamp=datetime.datetime(2014, 8, 1, 10, 9, 8, 77777),
event_type='test.thing',
test_trait="foobar",
other_test_trait=42)
self.assertTrue(tm.save_event(event))
tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing',
datetime.datetime(2014,8,1,10,9,8,77777),
tm.db.create_event.assert_called_once_with(
'1234-test-5678', 'test.thing',
datetime.datetime(2014, 8, 1, 10, 9, 8, 77777),
dict(test_trait='foobar', other_test_trait=42))
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
@ -37,13 +52,14 @@ class TestTriggerManager(unittest.TestCase):
tm.db = mock.MagicMock(spec=tm.db)
tm.db.create_event.side_effect = winch_db.DuplicateError("test boom!")
event = dict(message_id='1234-test-5678',
timestamp=datetime.datetime(2014,8,1,10,9,8,77777),
timestamp=datetime.datetime(2014, 8, 1, 10, 9, 8, 77777),
event_type='test.thing',
test_trait="foobar",
other_test_trait=42)
self.assertFalse(tm.save_event(event))
tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing',
datetime.datetime(2014,8,1,10,9,8,77777),
tm.db.create_event.assert_called_once_with(
'1234-test-5678', 'test.thing',
datetime.datetime(2014, 8, 1, 10, 9, 8, 77777),
dict(test_trait='foobar', other_test_trait=42))
@mock.patch('winchester.trigger_manager.EventCondenser', autospec=True)
@ -64,12 +80,14 @@ class TestTriggerManager(unittest.TestCase):
mock_condenser.assert_called_once_with(tm.db)
cond.clear.assert_called_once_with()
cond.validate.assert_called_once_with()
tm.distiller.to_event.assert_called_once_with('test notification here', cond)
self.assertEquals(res, test_event)
tm.distiller.to_event.assert_called_once_with('test notification here',
cond)
self.assertEqual(res, test_event)
@mock.patch('winchester.trigger_manager.EventCondenser', autospec=True)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
def test_convert_notification_dropped(self, mock_config_wrap, mock_condenser):
def test_convert_notification_dropped(self, mock_config_wrap,
mock_condenser):
tm = trigger_manager.TriggerManager('test')
tm.db = mock.MagicMock(spec=tm.db)
tm.distiller = mock.MagicMock(spec=tm.distiller)
@ -81,7 +99,8 @@ class TestTriggerManager(unittest.TestCase):
tm.save_event = mock.MagicMock()
tm.save_event.return_value = True
test_notif = dict(event_type='test.notification.here', message_id='4242-4242')
test_notif = dict(event_type='test.notification.here',
message_id='4242-4242')
res = tm.convert_notification(test_notif)
mock_condenser.assert_called_once_with(tm.db)
cond.clear.assert_called_once_with()
@ -92,7 +111,8 @@ class TestTriggerManager(unittest.TestCase):
@mock.patch('winchester.trigger_manager.EventCondenser', autospec=True)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
def test_convert_notification_invalid(self, mock_config_wrap, mock_condenser):
def test_convert_notification_invalid(self, mock_config_wrap,
mock_condenser):
tm = trigger_manager.TriggerManager('test')
tm.db = mock.MagicMock(spec=tm.db)
tm.distiller = mock.MagicMock(spec=tm.distiller)
@ -104,7 +124,8 @@ class TestTriggerManager(unittest.TestCase):
tm.save_event = mock.MagicMock()
tm.save_event.return_value = True
test_notif = dict(event_type='test.notification.here', message_id='4242-4242')
test_notif = dict(event_type='test.notification.here',
message_id='4242-4242')
res = tm.convert_notification(test_notif)
mock_condenser.assert_called_once_with(tm.db)
cond.clear.assert_called_once_with()
@ -124,12 +145,13 @@ class TestTriggerManager(unittest.TestCase):
event = "eventful!"
ret = tm._add_or_create_stream(trigger_def, event, dist_traits)
tm.db.get_active_stream.assert_called_once_with(trigger_def.name,
dist_traits, tm.current_time.return_value)
tm.db.get_active_stream.assert_called_once_with(
trigger_def.name,
dist_traits, tm.current_time.return_value)
self.assertFalse(tm.db.create_stream.called)
tm.db.add_event_stream.assert_called_once_with(
tm.db.get_active_stream.return_value,
event, trigger_def.expiration)
tm.db.get_active_stream.return_value,
event, trigger_def.expiration)
self.assertEqual(ret, tm.db.get_active_stream.return_value)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
@ -143,10 +165,12 @@ class TestTriggerManager(unittest.TestCase):
event = "eventful!"
ret = tm._add_or_create_stream(trigger_def, event, dist_traits)
tm.db.get_active_stream.assert_called_once_with(trigger_def.name, dist_traits,
tm.current_time.return_value)
tm.db.create_stream.assert_called_once_with(trigger_def.name, event, dist_traits,
trigger_def.expiration)
tm.db.get_active_stream.assert_called_once_with(
trigger_def.name, dist_traits,
tm.current_time.return_value)
tm.db.create_stream.assert_called_once_with(
trigger_def.name, event, dist_traits,
trigger_def.expiration)
self.assertFalse(tm.db.add_event_stream.called)
self.assertEqual(ret, tm.db.create_stream.return_value)
@ -159,8 +183,10 @@ class TestTriggerManager(unittest.TestCase):
test_stream = mock.MagicMock()
tm._ready_to_fire(test_stream, trigger_def)
trigger_def.get_fire_timestamp.assert_called_once_with(tm.current_time.return_value)
tm.db.stream_ready_to_fire.assert_called_once_with(test_stream,
trigger_def.get_fire_timestamp.assert_called_once_with(
tm.current_time.return_value)
tm.db.stream_ready_to_fire.assert_called_once_with(
test_stream,
trigger_def.get_fire_timestamp.return_value)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
@ -171,7 +197,8 @@ class TestTriggerManager(unittest.TestCase):
tm.add_notification("test notification")
tm.convert_notification.assert_called_once_with("test notification")
tm.add_event.assert_called_once_with(tm.convert_notification.return_value)
tm.add_event.assert_called_once_with(
tm.convert_notification.return_value)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
def test_add_notification_invalid_or_dropped(self, mock_config_wrap):
@ -205,16 +232,18 @@ class TestTriggerManager(unittest.TestCase):
tm.save_event.assert_called_once_with(event)
for td in tm.trigger_definitions:
td.match.assert_called_once_with(event)
m_def.get_distinguishing_traits.assert_called_once_with(event,
m_def.match.return_value)
tm._add_or_create_stream.assert_called_once_with(m_def, event,
m_def.get_distinguishing_traits.assert_called_once_with(
event,
m_def.match.return_value)
tm._add_or_create_stream.assert_called_once_with(
m_def, event,
m_def.get_distinguishing_traits.return_value)
tm.db.get_stream_events.assert_called_once_with(
tm._add_or_create_stream.return_value)
tm._add_or_create_stream.return_value)
m_def.should_fire.assert_called_once_with(
tm.db.get_stream_events.return_value)
tm.db.get_stream_events.return_value)
tm._ready_to_fire.assert_called_once_with(
tm._add_or_create_stream.return_value, m_def)
tm._add_or_create_stream.return_value, m_def)
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
def test_add_event_on_ready_stream(self, mock_config_wrap):
@ -235,9 +264,11 @@ class TestTriggerManager(unittest.TestCase):
tm.save_event.assert_called_once_with(event)
for td in tm.trigger_definitions:
td.match.assert_called_once_with(event)
m_def.get_distinguishing_traits.assert_called_once_with(event,
m_def.match.return_value)
tm._add_or_create_stream.assert_called_once_with(m_def, event,
m_def.get_distinguishing_traits.assert_called_once_with(
event,
m_def.match.return_value)
tm._add_or_create_stream.assert_called_once_with(
m_def, event,
m_def.get_distinguishing_traits.return_value)
self.assertFalse(tm.db.get_stream_events.called)
self.assertFalse(m_def.should_fire.called)

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2 as unittest
import datetime
@ -95,8 +111,8 @@ class TestUsageHandler(unittest.TestCase):
def test_extract_launched_at(self):
with self.assertRaises(pipeline_handler.UsageException):
self.handler._extract_launched_at({})
self.assertEquals("foo", self.handler._extract_launched_at(
{'launched_at': 'foo'}))
self.assertEqual("foo", self.handler._extract_launched_at(
{'launched_at': 'foo'}))
def test_extract_interesting(self):
interesting = ["a", "b", "c"]
@ -105,9 +121,9 @@ class TestUsageHandler(unittest.TestCase):
e3 = {'event_type': 'c'}
e4 = {'event_type': 'd'}
e5 = {'event_type': 'e'}
self.assertEquals([e1, e2, e3],
self.handler._extract_interesting_events(
[e4, e1, e2, e3, e5], interesting))
self.assertEqual([e1, e2, e3],
self.handler._extract_interesting_events(
[e4, e1, e2, e3, e5], interesting))
def test_verify_fields_no_match(self):
exists = {'a': 1, 'b': 2, 'c': 3}
@ -129,7 +145,7 @@ class TestUsageHandler(unittest.TestCase):
with self.assertRaises(pipeline_handler.UsageException) as e:
self.handler._confirm_delete({'deleted_at': 'now',
'state': 'active'}, [], [])
self.assertEquals("U3", e.code)
self.assertEqual("U3", e.code)
deleted_at = datetime.datetime(2014, 12, 31, 1, 0, 0)
launched_at = datetime.datetime(2014, 12, 31, 2, 0, 0)
@ -137,7 +153,7 @@ class TestUsageHandler(unittest.TestCase):
self.handler._confirm_delete({'deleted_at': deleted_at,
'launched_at': launched_at,
'state': 'deleted'}, [], [])
self.assertEquals("U4", e.code)
self.assertEqual("U4", e.code)
apb = datetime.datetime(2014, 12, 30, 0, 0, 0)
ape = datetime.datetime(2014, 12, 31, 0, 0, 0)
@ -149,7 +165,7 @@ class TestUsageHandler(unittest.TestCase):
'audit_period_beginning': apb,
'audit_period_ending': ape,
'state': 'deleted'}, [], [])
self.assertEquals("U5", e.code)
self.assertEqual("U5", e.code)
# Test the do-nothing scenario
self.handler._confirm_delete({}, [], [])
@ -157,11 +173,11 @@ class TestUsageHandler(unittest.TestCase):
def test_confirm_delete_with_delete_events(self):
with self.assertRaises(pipeline_handler.UsageException) as e:
self.handler._confirm_delete({}, [{}], [])
self.assertEquals("U6", e.code)
self.assertEqual("U6", e.code)
with self.assertRaises(pipeline_handler.UsageException) as e:
self.handler._confirm_delete({'deleted_at': 'now'}, [{}, {}], [])
self.assertEquals("U7", e.code)
self.assertEqual("U7", e.code)
with mock.patch.object(self.handler, "_verify_fields") as v:
exists = {'deleted_at': 'now', 'state': 'deleted'}
@ -181,53 +197,53 @@ class TestUsageHandler(unittest.TestCase):
'audit_period_beginning': apb,
'audit_period_ending': ape,
'launched_at': launched_at})
self.assertEquals("U8", e.code)
self.assertEqual("U8", e.code)
def test_process_block_exists(self):
exists = {'event_type':'compute.instance.exists', 'timestamp':'now',
'instance_id':'inst'}
exists = {'event_type': 'compute.instance.exists', 'timestamp': 'now',
'instance_id': 'inst'}
self.handler.stream_id = 123
with mock.patch.object(self.handler, "_do_checks") as c:
with mock.patch.object(self.handler, "_do_checks"):
events = self.handler._process_block([], exists)
self.assertEquals(1, len(events))
self.assertEqual(1, len(events))
f = events[0]
self.assertEquals("compute.instance.exists.verified",
f['event_type'])
self.assertEquals("now", f['timestamp'])
self.assertEquals(123, f['stream_id'])
self.assertEquals("inst", f['payload']['instance_id'])
self.assertEquals("None", f['error'])
self.assertEqual("compute.instance.exists.verified",
f['event_type'])
self.assertEqual("now", f['timestamp'])
self.assertEqual(123, f['stream_id'])
self.assertEqual("inst", f['payload']['instance_id'])
self.assertEqual("None", f['error'])
self.assertIsNone(f['error_code'])
def test_process_block_bad(self):
exists = {'event_type': 'compute.instance.exists', 'timestamp':'now',
'instance_id':'inst'}
exists = {'event_type': 'compute.instance.exists', 'timestamp': 'now',
'instance_id': 'inst'}
self.handler.stream_id = 123
with mock.patch.object(self.handler, "_do_checks") as c:
c.side_effect = pipeline_handler.UsageException("UX", "Error")
events = self.handler._process_block([], exists)
self.assertEquals(1, len(events))
self.assertEqual(1, len(events))
f = events[0]
self.assertEquals("compute.instance.exists.failed",
f['event_type'])
self.assertEquals("now", f['timestamp'])
self.assertEquals(123, f['stream_id'])
self.assertEquals("inst", f['payload']['instance_id'])
self.assertEquals("Error", f['error'])
self.assertEquals("UX", f['error_code'])
self.assertEqual("compute.instance.exists.failed",
f['event_type'])
self.assertEqual("now", f['timestamp'])
self.assertEqual(123, f['stream_id'])
self.assertEqual("inst", f['payload']['instance_id'])
self.assertEqual("Error", f['error'])
self.assertEqual("UX", f['error_code'])
def test_process_block_warnings(self):
self.handler.warnings = ['one', 'two']
exists = {'event_type': 'compute.instance.exists',
'timestamp':'now', 'instance_id':'inst'}
'timestamp': 'now', 'instance_id': 'inst'}
self.handler.stream_id = 123
with mock.patch.object(self.handler, "_do_checks") as c:
with mock.patch.object(self.handler, "_do_checks"):
events = self.handler._process_block([], exists)
self.assertEquals(2, len(events))
self.assertEquals("compute.instance.exists.warnings",
events[0]['event_type'])
self.assertEquals("compute.instance.exists.verified",
events[1]['event_type'])
self.assertEqual(2, len(events))
self.assertEqual("compute.instance.exists.warnings",
events[0]['event_type'])
self.assertEqual("compute.instance.exists.verified",
events[1]['event_type'])
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at')
@mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields')
@ -279,17 +295,17 @@ class TestUsageHandler(unittest.TestCase):
def test_handle_events_no_data(self):
env = {'stream_id': 123}
events = self.handler.handle_events([], env)
self.assertEquals(0, len(events))
self.assertEqual(0, len(events))
def test_handle_events_no_exists(self):
env = {'stream_id': 123}
raw = [{'event_type': 'foo'}]
events = self.handler.handle_events(raw, env)
self.assertEquals(1, len(events))
self.assertEqual(1, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
notifications[0]['event_type'])
self.assertEqual(1, len(notifications))
self.assertEqual("compute.instance.exists.failed",
notifications[0]['event_type'])
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
def test_handle_events_exists(self, pb):
@ -297,20 +313,21 @@ class TestUsageHandler(unittest.TestCase):
raw = [{'event_type': 'foo'},
{'event_type': 'compute.instance.exists'}]
events = self.handler.handle_events(raw, env)
self.assertEquals(2, len(events))
self.assertEqual(2, len(events))
self.assertTrue(pb.called)
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
def test_handle_events_dangling(self, pb):
env = {'stream_id': 123}
raw = [{'event_type': 'foo'},
{'event_type': 'compute.instance.exists'},
{'event_type': 'foo'},
]
raw = [
{'event_type': 'foo'},
{'event_type': 'compute.instance.exists'},
{'event_type': 'foo'},
]
events = self.handler.handle_events(raw, env)
self.assertEquals(3, len(events))
self.assertEqual(3, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
notifications[0]['event_type'])
self.assertEqual(1, len(notifications))
self.assertEqual("compute.instance.exists.failed",
notifications[0]['event_type'])
self.assertTrue(pb.called)

10
tox.ini
View File

@ -1,5 +1,5 @@
[tox]
envlist = py26,py27
envlist = py26,py27,pep8
[testenv]
deps =
@ -13,3 +13,11 @@ commands =
sitepackages = False
[testenv:pep8]
commands =
flake8
[flake8]
ignore =
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,*db/__init__.py,*db/migrations/versions/*_.py
show-source = True

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
import os
@ -98,12 +114,13 @@ class ConfigManager(collections.Mapping):
prefix = prefix + '/'
for r in self._required:
if r not in self:
msg = "Required Configuration setting %s%s is missing!" % (prefix,r)
msg = "Required Configuration setting %s%s is missing!" % (
prefix, r)
logger.error(msg)
raise ConfigurationError(msg)
for k, item in self.items():
if hasattr(item, 'check_config'):
item.check_config(prefix="%s%s" % (prefix,k))
item.check_config(prefix="%s%s" % (prefix, k))
@classmethod
def _load_yaml_config(cls, config_data, filename="(unknown)"):
@ -115,13 +132,13 @@ class ConfigManager(collections.Mapping):
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = ("Invalid YAML syntax in Configuration file "
"%(file)s at line: %(line)s, column: %(column)s."
"%(file)s at line: %(line)s, column: %(column)s."
% dict(file=filename,
line=mark.line + 1,
column=mark.column + 1))
else:
errmsg = ("YAML error reading Configuration file "
"%(file)s"
"%(file)s"
% dict(file=filename))
logger.error(errmsg)
raise
@ -147,18 +164,19 @@ class ConfigManager(collections.Mapping):
paths = ['.']
if filetype is None:
if (filename.lower().endswith('.yaml') or
filename.lower().endswith('.yml')):
filename.lower().endswith('.yml')):
filetype = 'yaml'
elif filename.lower().endswith('.json'):
filetype = 'json'
elif (filename.lower().endswith('.conf') or
filename.lower().endswith('.ini')):
elif (filename.lower().endswith('.conf')
or filename.lower().endswith('.ini')):
filetype = 'ini'
else:
filetype = 'yaml'
data = cls._load_file(filename, paths)
if data is None:
raise ConfigurationError("Cannot find or read config file: %s" % filename)
raise ConfigurationError(
"Cannot find or read config file: %s" % filename)
try:
loader = getattr(cls, "_load_%s_config" % filetype)
except AttributeError:
@ -166,6 +184,5 @@ class ConfigManager(collections.Mapping):
return loader(data, filename=filename)
def load_file(self, filename, filetype=None):
return self.load_config_file(filename, filetype, paths=self.config_paths)
return self.load_config_file(filename, filetype,
paths=self.config_paths)

View File

@ -1,5 +1,3 @@
from winchester.db.interface import DuplicateError, LockError
from winchester.db.interface import NoSuchEventError, NoSuchStreamError
from winchester.db.interface import DBInterface

View File

@ -1,4 +1,22 @@
from alembic import util, command, config
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from alembic import command
from alembic import config
from alembic import util
import argparse
import inspect
@ -16,35 +34,34 @@ class AlembicCommandLine(object):
if allowed_commands is not None:
self.allowed_commands = allowed_commands
self.parser = self.generate_options()
def add_command_options(self, parser, positional, kwargs):
if 'template' in kwargs:
parser.add_argument("-t", "--template",
default='generic',
type=str,
help="Setup template for use with 'init'")
default='generic',
type=str,
help="Setup template for use with 'init'")
if 'message' in kwargs:
parser.add_argument("-m", "--message",
type=str,
help="Message string to use with 'revision'")
type=str,
help="Message string to use with 'revision'")
if 'sql' in kwargs:
parser.add_argument("--sql",
action="store_true",
help="Don't emit SQL to database - dump to "
"standard output/file instead")
action="store_true",
help="Don't emit SQL to database - dump to "
"standard output/file instead")
if 'tag' in kwargs:
parser.add_argument("--tag",
type=str,
help="Arbitrary 'tag' name - can be used by "
"custom env.py scripts.")
type=str,
help="Arbitrary 'tag' name - can be used by "
"custom env.py scripts.")
if 'autogenerate' in kwargs:
parser.add_argument("--autogenerate",
action="store_true",
help="Populate revision script with candidate "
"migration operations, based on comparison "
"of database to model.")
action="store_true",
help="Populate revision script with "
"candidate migration operations, based "
"on comparison of database to model.")
# "current" command
if 'head_only' in kwargs:
parser.add_argument("--head-only",
@ -58,7 +75,6 @@ class AlembicCommandLine(object):
help="Specify a revision range; "
"format is [start]:[end]")
positional_help = {
'directory': "location of scripts directory",
'revision': "revision identifier"
@ -96,11 +112,11 @@ class AlembicCommandLine(object):
cmds = []
for fn in [getattr(command, n) for n in dir(command)]:
if (inspect.isfunction(fn) and
fn.__name__[0] != '_' and
fn.__module__ == 'alembic.command'):
fn.__name__[0] != '_' and
fn.__module__ == 'alembic.command'):
if (self.allowed_commands and
fn.__name__ not in self.allowed_commands):
fn.__name__ not in self.allowed_commands):
continue
spec = inspect.getargspec(fn)
@ -123,7 +139,7 @@ class AlembicCommandLine(object):
try:
fn(config, *[getattr(options, k) for k in positional],
**dict((k, getattr(options, k)) for k in kwarg))
**dict((k, getattr(options, k)) for k in kwarg))
except util.CommandError as e:
util.err(str(e))

View File

@ -1,4 +1,19 @@
import argparse
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import alembic
import logging
@ -17,11 +32,11 @@ class DBAdminCommandLine(AlembicCommandLine):
type=str,
help='The name of the winchester config file')
def get_config(self, options):
alembic_cfg = alembic.config.Config()
alembic_cfg.set_main_option("winchester_config", options.config)
alembic_cfg.set_main_option("script_location", "winchester.db:migrations")
alembic_cfg.set_main_option("script_location",
"winchester.db:migrations")
return alembic_cfg

View File

@ -1,14 +1,30 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
import logging
import sqlalchemy
from sqlalchemy import and_, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.exc import MultipleResultsFound
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import sessionmaker
from winchester.config import ConfigItem
from winchester.config import ConfigManager
from winchester import models
from winchester.config import ConfigManager, ConfigSection, ConfigItem
logger = logging.getLogger(__name__)
@ -42,16 +58,17 @@ def sessioned(func):
kw['session'] = session
retval = func(self, *args, **kw)
return retval
return with_session
class DBInterface(object):
@classmethod
def config_description(cls):
return dict(url=ConfigItem(required=True,
help="Connection URL for database."),
)
return dict(
url=ConfigItem(required=True,
help="Connection URL for database."),
)
def __init__(self, config):
self.config = ConfigManager.wrap(config, self.config_description())
@ -93,7 +110,7 @@ class DBInterface(object):
except IntegrityError:
session.rollback()
raise DuplicateError("Duplicate unique value detected!")
except:
except Exception:
session.rollback()
raise
finally:
@ -102,7 +119,7 @@ class DBInterface(object):
@sessioned
def get_event_type(self, description, session=None):
t = session.query(models.EventType).filter(
models.EventType.desc == description).first()
models.EventType.desc == description).first()
if t is None:
t = models.EventType(description)
session.add(t)
@ -120,11 +137,11 @@ class DBInterface(object):
@sessioned
def get_event_by_message_id(self, message_id, session=None):
try:
e = session.query(models.Event).\
e = session.query(models.Event). \
filter(models.Event.message_id == message_id).one()
except NoResultFound:
raise NoSuchEventError(
"No event found with message_id %s!" % message_id)
"No event found with message_id %s!" % message_id)
return e.as_dict
@sessioned
@ -137,10 +154,10 @@ class DBInterface(object):
q = session.query(models.Event)
if mark is not None:
if mark.startswith('+'):
order_desc=False
order_desc = False
mark = mark[1:]
if mark.startswith('-'):
order_desc=True
order_desc = True
mark = mark[1:]
if mark:
if order_desc:
@ -158,8 +175,8 @@ class DBInterface(object):
if traits is not None:
for name, val in traits.items():
q = q.filter(models.Event.traits.any(and_(
models.Trait.name == name,
models.Trait.value == val)))
models.Trait.name == name,
models.Trait.value == val)))
if count:
q = q.count()
@ -185,7 +202,7 @@ class DBInterface(object):
@sessioned
def get_stream_by_id(self, stream_id, session=None):
try:
s = session.query(models.Stream).\
s = session.query(models.Stream). \
filter(models.Stream.id == stream_id).one()
except NoResultFound:
raise NoSuchStreamError("No stream found with id %s!" % stream_id)
@ -246,8 +263,8 @@ class DBInterface(object):
q = q.filter(models.Stream.expire_timestamp > current_time)
for name, val in dist_traits.items():
q = q.filter(models.Stream.distinguished_by.any(and_(
models.DistinguishingTrait.name == name,
models.DistinguishingTrait.value == val)))
models.DistinguishingTrait.name == name,
models.DistinguishingTrait.value == val)))
return q.first()
@sessioned
@ -257,12 +274,16 @@ class DBInterface(object):
stream.fire_timestamp = timestamp
@sessioned
def get_ready_streams(self, batch_size, current_time, expire=False, session=None):
def get_ready_streams(self, batch_size, current_time, expire=False,
session=None):
q = session.query(models.Stream)
if expire:
states = (int(models.StreamState.active), int(models.StreamState.retry_expire))
states = (int(models.StreamState.active),
int(models.StreamState.retry_expire))
else:
states = (int(models.StreamState.active), int(models.StreamState.retry_fire))
states = (
int(models.StreamState.active),
int(models.StreamState.retry_fire))
q = q.filter(models.Stream.state.in_(states))
if expire:
@ -275,7 +296,7 @@ class DBInterface(object):
def set_stream_state(self, stream, state):
serial = stream.state_serial_no
stream_id = stream.id
#we do this in a separate session, as it needs to be atomic.
# we do this in a separate session, as it needs to be atomic.
with self.in_session() as session:
q = session.query(models.Stream)
q = q.filter(models.Stream.id == stream_id)
@ -289,7 +310,8 @@ class DBInterface(object):
if stream.state == models.StreamState.error:
return self.set_stream_state(stream, models.StreamState.retry_fire)
if stream.state == models.StreamState.expire_error:
return self.set_stream_state(stream, models.StreamState.retry_expire)
return self.set_stream_state(stream,
models.StreamState.retry_expire)
return stream
@sessioned
@ -304,10 +326,10 @@ class DBInterface(object):
q = session.query(models.Stream)
if mark is not None:
if mark.startswith('+'):
order_desc=False
order_desc = False
mark = mark[1:]
if mark.startswith('-'):
order_desc=True
order_desc = True
mark = mark[1:]
if mark:
if order_desc:
@ -327,8 +349,8 @@ class DBInterface(object):
if distinguishing_traits is not None:
for name, val in distinguishing_traits.items():
q = q.filter(models.Stream.distinguished_by.any(and_(
models.DistinguishingTrait.name == name,
models.DistinguishingTrait.value == val)))
models.DistinguishingTrait.name == name,
models.DistinguishingTrait.value == val)))
if count:
q = q.count()
@ -349,7 +371,8 @@ class DBInterface(object):
info = stream.as_dict
info['_mark'] = mark_fmt % stream.id
if include_events:
info['events'] = self.get_stream_events(stream, session=session)
info['events'] = self.get_stream_events(
stream, session=session)
stream_info.append(info)
return stream_info
@ -358,4 +381,3 @@ class DBInterface(object):
if stream not in session:
session.add(stream)
session.delete(stream)

View File

@ -1,7 +1,23 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
@ -9,7 +25,7 @@ config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
#fileConfig(config.config_file_name)
# fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
@ -17,6 +33,7 @@ config = context.config
# target_metadata = mymodel.Base.metadata
from winchester.config import ConfigManager
from winchester.models import Base
target_metadata = Base.metadata
winchester_config = ConfigManager.load_config_file(
@ -26,6 +43,7 @@ winchester_config = ConfigManager.load_config_file(
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
@ -44,6 +62,7 @@ def run_migrations_offline():
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
@ -52,15 +71,15 @@ def run_migrations_online():
"""
engine = engine_from_config(
winchester_config['database'],
prefix='',
poolclass=pool.NullPool)
winchester_config['database'],
prefix='',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata
)
connection=connection,
target_metadata=target_metadata
)
try:
with context.begin_transaction():
@ -68,8 +87,8 @@ def run_migrations_online():
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -75,7 +75,7 @@ class BaseDebugger(object):
class NoOpDebugger(BaseDebugger):
def __init__(self, *args, **kwargs):
self.noop_group = NoOpGroup()
self.noop_group = NoOpGroup()
def reset(self):
pass
@ -136,8 +136,8 @@ class DebugManager(object):
def dump_group(self, debugger, group_name):
group = debugger.get_group(group_name)
logger.info("%s Criteria: %d checks, %d passed" %
(group._name,
group._match + group._mismatch, group._match))
(group._name,
group._match + group._mismatch, group._match))
if debugger.get_debug_level() > 1:
for kv in group._reasons.items():

View File

@ -1,9 +1,25 @@
import logging
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import datetime
import fnmatch
import logging
import six
import timex
import fnmatch
logger = logging.getLogger(__name__)
@ -19,7 +35,6 @@ def filter_event_timestamps(event):
class Criterion(object):
@classmethod
def get_from_expression(cls, expression, trait_name):
if isinstance(expression, collections.Mapping):
@ -30,7 +45,7 @@ class Criterion(object):
expr = expression[ctype]
if ctype == 'int':
return NumericCriterion(expr, trait_name)
elif ctype =='float':
elif ctype == 'float':
return FloatCriterion(expr, trait_name)
elif ctype == 'datetime':
return TimeCriterion(expr, trait_name)
@ -42,7 +57,7 @@ class Criterion(object):
def __init__(self, expr, trait_name):
self.trait_name = trait_name
#match a constant
# match a constant
self.op = '='
self.value = expr
@ -140,7 +155,8 @@ class Criteria(object):
self.traits = dict()
if 'traits' in config:
for trait, criterion in config['traits'].items():
self.traits[trait] = Criterion.get_from_expression(criterion, trait)
self.traits[trait] = Criterion.get_from_expression(criterion,
trait)
def included_type(self, event_type):
return any(fnmatch.fnmatch(event_type, t) for t in self.included_types)
@ -159,18 +175,18 @@ class Criteria(object):
try:
t = self.timestamp(**filter_event_timestamps(event))
except timex.TimexExpressionError:
# the event doesn't contain a trait referenced in the expression.
# the event doesn't contain a trait referenced
# in the expression.
return debug_group.mismatch("No timestamp trait")
if event['timestamp'] not in t:
return debug_group.mismatch("Not time yet.")
if not self.traits:
return debug_group.match()
return all(criterion.match(event, debug_group) for
criterion in self.traits.values())
criterion in self.traits.values())
class TriggerDefinition(object):
def __init__(self, config, debug_manager):
if 'name' not in config:
raise DefinitionError("Required field in trigger definition not "
@ -181,8 +197,10 @@ class TriggerDefinition(object):
for dt in self.distinguished_by:
if isinstance(dt, collections.Mapping):
if len(dt) > 1:
raise DefinitionError("Invalid distinguising expression "
"%s. Only one trait allowed in an expression" % str(dt))
raise DefinitionError(
"Invalid distinguising expression "
"%s. Only one trait allowed in an expression"
% str(dt))
self.fire_delay = config.get('fire_delay', 0)
if 'expiration' not in config:
raise DefinitionError("Required field in trigger definition not "
@ -195,12 +213,12 @@ class TriggerDefinition(object):
"'expire_pipeline' must be specified in a "
"trigger definition.")
if 'fire_criteria' not in config:
raise DefinitionError("Required criteria in trigger definition not "
"specified 'fire_criteria'")
raise DefinitionError("Required criteria in trigger definition "
"not specified 'fire_criteria'")
self.fire_criteria = [Criteria(c) for c in config['fire_criteria']]
if 'match_criteria' not in config:
raise DefinitionError("Required criteria in trigger definition not "
"specified 'match_criteria'")
raise DefinitionError("Required criteria in trigger definition "
"not specified 'match_criteria'")
self.match_criteria = [Criteria(c) for c in config['match_criteria']]
self.load_criteria = []
if 'load_criteria' in config:
@ -240,9 +258,11 @@ class TriggerDefinition(object):
d_expr = timex.parse(dt[trait_name])
else:
trait_name = dt
event_trait_name = matching_criteria.map_distinguished_by.get(trait_name, trait_name)
event_trait_name = matching_criteria.map_distinguished_by.get(
trait_name, trait_name)
if d_expr is not None:
dist_traits[trait_name] = d_expr(timestamp=event[event_trait_name])
dist_traits[trait_name] = d_expr(
timestamp=event[event_trait_name])
else:
dist_traits[trait_name] = event[event_trait_name]
return dist_traits

View File

@ -1,25 +1,44 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
from datetime import datetime
from decimal import Decimal
import calendar
from enum import IntEnum
import timex
from sqlalchemy import event
from sqlalchemy import and_, or_
from sqlalchemy import literal_column
from sqlalchemy import Column, Table, ForeignKey, Index, UniqueConstraint
from sqlalchemy import Float, Boolean, Text, DateTime, Integer, String
from sqlalchemy import cast, null, case
from sqlalchemy.orm.interfaces import PropComparator
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import and_
from sqlalchemy import Column
from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import composite
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm import composite
from sqlalchemy.orm.interfaces import PropComparator
from sqlalchemy.orm import relationship
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import TypeDecorator, DATETIME
@ -166,7 +185,8 @@ class PolymorphicVerticalProperty(object):
@hybrid_property
def value(self):
if self.type not in self.ATTRIBUTE_MAP:
raise InvalidTraitType("Invalid trait type in db for %s: %s" % (self.name, self.type))
raise InvalidTraitType(
"Invalid trait type in db for %s: %s" % (self.name, self.type))
attribute = self.ATTRIBUTE_MAP[self.type]
if attribute is None:
return None
@ -180,7 +200,8 @@ class PolymorphicVerticalProperty(object):
def value(self, value):
datatype, value = self.get_type_value(value)
if datatype not in self.ATTRIBUTE_MAP:
raise InvalidTraitType("Invalid trait type for %s: %s" % (self.name, datatype))
raise InvalidTraitType(
"Invalid trait type for %s: %s" % (self.name, datatype))
attribute = self.ATTRIBUTE_MAP[datatype]
self.type = int(datatype)
if attribute is not None:
@ -239,13 +260,13 @@ class Trait(PolymorphicVerticalProperty, Base):
Datatype.string: 't_string',
Datatype.int: 't_int',
Datatype.float: 't_float',
Datatype.datetime: 't_datetime',}
Datatype.datetime: 't_datetime', }
t_string = Column(String(255), nullable=True, default=None)
t_float = Column(Float, nullable=True, default=None)
t_int = Column(Integer, nullable=True, default=None)
t_datetime = Column(PreciseTimestamp(),
nullable=True, default=None)
nullable=True, default=None)
def __repr__(self):
return "<Trait(%s) %s=%s/%s/%s/%s on %s>" % (self.name,
@ -286,9 +307,11 @@ class Event(ProxiedDictMixin, Base):
event_type = relationship("EventType", backref=backref('event_type'))
traits = relationship("Trait",
collection_class=attribute_mapped_collection('name'))
collection_class=attribute_mapped_collection('name'))
_proxied = association_proxy("traits", "value",
creator=lambda name, value: Trait(name=name, value=value))
creator=lambda name, value: Trait(
name=name,
value=value))
@property
def event_type_string(self):
@ -303,24 +326,23 @@ class Event(ProxiedDictMixin, Base):
return d
def __init__(self, message_id, event_type, generated):
self.message_id = message_id
self.event_type = event_type
self.generated = generated
self.message_id = message_id
self.event_type = event_type
self.generated = generated
def __repr__(self):
return "<Event %s ('Event : %s %s, Generated: %s')>" % (self.id,
self.message_id,
self.event_type,
self.generated)
return "<Event %s ('Event : %s %s, Generated: %s')>" % (
self.id,
self.message_id,
self.event_type,
self.generated)
stream_event_table = Table('streamevent', Base.metadata,
Column('stream_id', Integer, ForeignKey('stream.id'), primary_key=True),
Column('event_id', Integer,
ForeignKey('event.id'),
primary_key=True)
)
stream_event_table = Table(
'streamevent', Base.metadata,
Column('stream_id', Integer, ForeignKey('stream.id'), primary_key=True),
Column('event_id', Integer, ForeignKey('event.id'), primary_key=True)
)
class Stream(ProxiedDictMixin, Base):
@ -341,11 +363,15 @@ class Stream(ProxiedDictMixin, Base):
state = Column(Integer, default=StreamState.active, nullable=False)
state_serial_no = Column(Integer, default=0, nullable=False)
distinguished_by = relationship("DistinguishingTrait",
cascade="save-update, merge, delete, delete-orphan",
collection_class=attribute_mapped_collection('name'))
_proxied = association_proxy("distinguished_by", "value",
creator=lambda name, value: DistinguishingTrait(name=name, value=value))
distinguished_by = relationship(
"DistinguishingTrait",
cascade="save-update, merge, delete, delete-orphan",
collection_class=attribute_mapped_collection(
'name'))
_proxied = association_proxy(
"distinguished_by", "value",
creator=lambda name, value: DistinguishingTrait(
name=name, value=value))
events = relationship(Event, secondary=stream_event_table,
order_by=Event.generated)
@ -365,10 +391,9 @@ class Stream(ProxiedDictMixin, Base):
'expire_timestamp': self.expire_timestamp,
'distinguishing_traits': self.distinguished_by_dict}
def __init__(self, name, first_event, last_event=None, expire_timestamp=None,
fire_timestamp=None, state=None, state_serial_no=None):
def __init__(self, name, first_event, last_event=None,
expire_timestamp=None,
fire_timestamp=None, state=None, state_serial_no=None):
self.name = name
self.first_event = first_event
if last_event is None:
@ -398,21 +423,22 @@ class DistinguishingTrait(PolymorphicVerticalProperty, Base):
name = Column(String(100), primary_key=True)
type = Column(Integer)
ATTRIBUTE_MAP = {Datatype.none: None,
Datatype.string: 'dt_string',
Datatype.int: 'dt_int',
Datatype.float: 'dt_float',
Datatype.datetime: 'dt_datetime',
Datatype.timerange:'dt_timerange',
}
ATTRIBUTE_MAP = {
Datatype.none: None,
Datatype.string: 'dt_string',
Datatype.int: 'dt_int',
Datatype.float: 'dt_float',
Datatype.datetime: 'dt_datetime',
Datatype.timerange: 'dt_timerange',
}
dt_string = Column(String(255), nullable=True, default=None)
dt_float = Column(Float, nullable=True, default=None)
dt_int = Column(Integer, nullable=True, default=None)
dt_datetime = Column(PreciseTimestamp(),
nullable=True, default=None)
dt_timerange_begin = Column(PreciseTimestamp(), nullable=True, default=None)
nullable=True, default=None)
dt_timerange_begin = Column(
PreciseTimestamp(), nullable=True, default=None)
dt_timerange_end = Column(PreciseTimestamp(), nullable=True, default=None)
dt_timerange = composite(DBTimeRange, dt_timerange_begin, dt_timerange_end)
@ -422,12 +448,13 @@ class DistinguishingTrait(PolymorphicVerticalProperty, Base):
return {self.name: self.value}
def __repr__(self):
return "<DistinguishingTrait(%s) %s=%s/%s/%s/%s/(%s to %s) on %s>" % (self.name,
self.type,
self.dt_string,
self.dt_float,
self.dt_int,
self.dt_datetime,
self.dt_timerange_begin,
self.dt_timerange_end,
self.stream_id)
return ("<DistinguishingTrait(%s) %s=%s/%s/%s/%s/(%s to %s) on %s>"
% (self.name,
self.type,
self.dt_string,
self.dt_float,
self.dt_int,
self.dt_datetime,
self.dt_timerange_begin,
self.dt_timerange_end,
self.stream_id))

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import datetime
import logging
@ -16,63 +32,63 @@ class PipelineHandlerBase(object):
Pipeline handlers perform the actual processing on a set of events
captured by a stream. The handlers are chained together, each handler
in a pipeline is called in order, and receives the output of the previous
handler.
in a pipeline is called in order, and receives the output of the
previous handler.
Once all of the handlers in a pipeline have successfully processed the
events (with .handle_events() ), each handler's .commit() method will be
called. If any handler in the chain raises an exception, processing of
events will stop, and each handler's .rollback() method will be called."""
events will stop, and each handler's .rollback() method will be called.
"""
def __init__(self, **kw):
"""Setup the pipeline handler.
"""Setup the pipeline handler.
A new instance of each handler for a pipeline is used for each
stream (set of events) processed.
A new instance of each handler for a pipeline is used for each
stream (set of events) processed.
:param kw: The parameters listed in the pipeline config file for
this handler (if any).
"""
:param kw: The parameters listed in the pipeline config file for
this handler (if any).
"""
@abc.abstractmethod
def handle_events(self, events, env):
""" This method handles the actual event processing.
"""This method handles the actual event processing.
This method receives a list of events and should return a list of
events as well. The return value of this method will be passed to
the next handler's .handle_events() method. Handlers can add new
events simply by adding them to the list they return. New events
(those with unrecognized message_id's), will be saved to the
database if all handlers in this pipeline complete successfully.
Likewise, handlers can omit events from the list they return to
act as a filter for downstream handlers.
This method receives a list of events and should return a list of
events as well. The return value of this method will be passed to
the next handler's .handle_events() method. Handlers can add new
events simply by adding them to the list they return. New events
(those with unrecognized message_id's), will be saved to the
database if all handlers in this pipeline complete successfully.
Likewise, handlers can omit events from the list they return to
act as a filter for downstream handlers.
Care should be taken to avoid any operation with side-effects in
this method. Pipelines can be re-tried if a handler throws an
error. If you need to perform such operations, such as interacting
with an external system, save the needed information in an instance
variable, and perform the operation in the .commit() method.
Care should be taken to avoid any operation with side-effects in
this method. Pipelines can be re-tried if a handler throws an
error. If you need to perform such operations, such as interacting
with an external system, save the needed information in an instance
variable, and perform the operation in the .commit() method.
:param events: A list of events.
:param env: Just a dictionary, it's passed to each handler, and
can act as a shared scratchpad.
:param events: A list of events.
:param env: Just a dictionary, it's passed to each handler, and
can act as a shared scratchpad.
:returns: A list of events.
:returns: A list of events.
"""
@abc.abstractmethod
def commit(self):
""" Called when each handler in this pipeline has successfully
completed.
"""Called when each handler in this pipeline successfully completes.
If you have operations with side effects, preform them here.
Exceptions raised here will be logged, but otherwise ignored.
If you have operations with side effects, preform them here.
Exceptions raised here will be logged, but otherwise ignored.
"""
@abc.abstractmethod
def rollback(self):
""" Called if there is an error for any handler while processing a list
of events.
"""Called if error in any handler while processing a list of events.
If you need to perform some kind of cleanup, do it here.
Exceptions raised here will be logged, but otherwise ignored.
@ -80,10 +96,9 @@ class PipelineHandlerBase(object):
class LoggingHandler(PipelineHandlerBase):
def handle_events(self, events, env):
emsg = ', '.join("%s: %s" % (event['event_type'], event['message_id'])
for event in events)
for event in events)
logger.info("Received %s events: \n%s" % (len(events), emsg))
return events
@ -130,8 +145,8 @@ class ConnectionManager(object):
exchange_dict, exchange_tuple)
def get_connection(self, properties, queue_name):
connection_dict, connection_tuple, \
exchange_dict, exchange_tuple = self._extract_params(properties)
(connection_dict, connection_tuple,
exchange_dict, exchange_tuple) = self._extract_params(properties)
connection_info = self.pool.get(connection_tuple)
if connection_info is None:
connection = driver.create_connection(connection_dict['host'],
@ -175,7 +190,7 @@ class NotabeneHandler(PipelineHandlerBase):
if self.queue_name is None:
raise NotabeneException("No 'queue_name' provided")
self.connection, self.exchange = connection_manager.get_connection(
kw, self.queue_name)
kw, self.queue_name)
self.env_keys = kw.get('env_keys', [])
@ -189,8 +204,8 @@ class NotabeneHandler(PipelineHandlerBase):
def commit(self):
for notification in self.pending_notifications:
logger.info("Publishing '%s' to '%s' with routing_key '%s'" %
(notification['event_type'], self.exchange,
self.queue_name))
(notification['event_type'], self.exchange,
self.queue_name))
try:
driver.send_notification(notification, self.queue_name,
self.connection, self.exchange)
@ -228,7 +243,7 @@ class UsageHandler(PipelineHandlerBase):
# we'll do that later.
apb, ape = self._get_audit_period(event)
return (self._is_exists(event) and apb and ape
and ape.date() != (apb.date() + datetime.timedelta(days=1)))
and ape.date() != (apb.date() + datetime.timedelta(days=1)))
def _is_EOD_exists(self, event):
# We could have several .exists records, but only the
@ -236,9 +251,9 @@ class UsageHandler(PipelineHandlerBase):
# 00:00:00 and be 24hrs apart.
apb, ape = self._get_audit_period(event)
return (self._is_exists(event) and apb and ape
and apb.time() == datetime.time(0, 0, 0)
and ape.time() == datetime.time(0, 0, 0)
and ape.date() == (apb.date() + datetime.timedelta(days=1)))
and apb.time() == datetime.time(0, 0, 0)
and ape.time() == datetime.time(0, 0, 0)
and ape.date() == (apb.date() + datetime.timedelta(days=1)))
def _extract_launched_at(self, exists):
if not exists.get('launched_at'):
@ -247,7 +262,7 @@ class UsageHandler(PipelineHandlerBase):
def _extract_interesting_events(self, events, interesting):
return [event for event in events
if event['event_type'] in interesting]
if event['event_type'] in interesting]
def _find_deleted_events(self, events):
interesting = ['compute.instance.delete.end']
@ -259,8 +274,8 @@ class UsageHandler(PipelineHandlerBase):
continue
if this[field] != that[field]:
raise UsageException("U2",
"Conflicting '%s' values ('%s' != '%s')"
% (field, this[field], that[field]))
"Conflicting '%s' values ('%s' != '%s')"
% (field, this[field], that[field]))
def _confirm_delete(self, exists, delete_events, fields):
deleted_at = exists.get('deleted_at')
@ -269,23 +284,26 @@ class UsageHandler(PipelineHandlerBase):
if not deleted_at and delete_events:
raise UsageException("U6", ".deleted events found but .exists "
"has no deleted_at value.")
"has no deleted_at value.")
if deleted_at and state != "deleted":
raise UsageException("U3", ".exists state not 'deleted' but "
".exists deleted_at is set.")
".exists deleted_at is set.")
if deleted_at and not delete_events:
# We've already confirmed it's in the "deleted" state.
launched_at = exists.get('launched_at')
if deleted_at < launched_at:
raise UsageException("U4",
".exists deleted_at < .exists launched_at.")
raise UsageException(
"U4",
".exists deleted_at < .exists launched_at.")
# Is the deleted_at within this audit period?
if (apb and ape and deleted_at >= apb and deleted_at <= ape):
raise UsageException("U5", ".exists deleted_at in audit "
"period, but no matching .delete event found.")
raise UsageException("U5",
".exists deleted_at in audit "
"period, but no matching .delete "
"event found.")
if len(delete_events) > 1:
raise UsageException("U7", "Multiple .delete.end events")
@ -303,18 +321,15 @@ class UsageHandler(PipelineHandlerBase):
# If so, we should have a related event. Otherwise, this
# instance was created previously.
launched_at = self._extract_launched_at(exists)
if (apb and ape and
launched_at >= apb and launched_at <= ape and
len(block) == 0):
raise UsageException("U8", ".exists launched_at in audit "
"period, but no related events found.")
if apb and ape and apb <= launched_at <= ape and len(block) == 0:
raise UsageException("U8", ".exists launched_at in audit "
"period, but no related events found.")
# TODO(sandy): Confirm the events we got set launched_at
# properly.
# TODO(sandy): Confirm the events we got set launched_at
# properly.
def _get_core_fields(self):
"""Broken out so derived classes can define their
own trait list."""
"""Broken out so derived classes can define their own trait list."""
return ['launched_at', 'instance_flavor_id', 'tenant_id',
'os_architecture', 'os_version', 'os_distro']
@ -346,27 +361,29 @@ class UsageHandler(PipelineHandlerBase):
apb, ape = self._get_audit_period(exists)
return {
'payload': {
'audit_period_beginning': str(apb),
'audit_period_ending': str(ape),
'launched_at': str(exists.get('launched_at', '')),
'deleted_at': str(exists.get('deleted_at', '')),
'instance_id': exists.get('instance_id', ''),
'tenant_id': exists.get('tenant_id', ''),
'display_name': exists.get('display_name', ''),
'instance_type': exists.get('instance_flavor', ''),
'instance_flavor_id': exists.get('instance_flavor_id', ''),
'state': exists.get('state', ''),
'state_description': exists.get('state_description', ''),
'bandwidth': {'public': {
'bw_in': exists.get('bandwidth_in', 0),
'bw_out': exists.get('bandwidth_out', 0)}},
'image_meta': {
'org.openstack__1__architecture':
exists.get('os_architecture', ''),
'org.openstack__1__os_version': exists.get('os_version', ''),
'org.openstack__1__os_distro': exists.get('os_distro', ''),
'org.rackspace__1__options': exists.get('rax_options', '0')
}},
'audit_period_beginning': str(apb),
'audit_period_ending': str(ape),
'launched_at': str(exists.get('launched_at', '')),
'deleted_at': str(exists.get('deleted_at', '')),
'instance_id': exists.get('instance_id', ''),
'tenant_id': exists.get('tenant_id', ''),
'display_name': exists.get('display_name', ''),
'instance_type': exists.get('instance_flavor', ''),
'instance_flavor_id': exists.get('instance_flavor_id', ''),
'state': exists.get('state', ''),
'state_description': exists.get('state_description', ''),
'bandwidth': {'public': {
'bw_in': exists.get('bandwidth_in', 0),
'bw_out': exists.get('bandwidth_out', 0)}},
'image_meta': {
'org.openstack__1__architecture': exists.get(
'os_architecture', ''),
'org.openstack__1__os_version': exists.get('os_version',
''),
'org.openstack__1__os_distro': exists.get('os_distro', ''),
'org.rackspace__1__options': exists.get('rax_options', '0')
}
},
'original_message_id': exists.get('message_id', '')}
def _process_block(self, block, exists):
@ -378,7 +395,7 @@ class UsageHandler(PipelineHandlerBase):
error = e
event_type = "compute.instance.exists.failed"
logger.warn("Stream %s UsageException: (%s) %s" %
(self.stream_id, e.code, e))
(self.stream_id, e.code, e))
apb, ape = self._get_audit_period(exists)
logger.warn("Stream %s deleted_at: %s, launched_at: %s, "
"state: %s, APB: %s, APE: %s, #events: %s" %
@ -388,10 +405,10 @@ class UsageHandler(PipelineHandlerBase):
if len(block) > 1:
logger.warn("%s - events (stream: %s)"
% (event_type, self.stream_id))
% (event_type, self.stream_id))
for event in block:
logger.warn("^Event: %s - %s" %
(event['timestamp'], event['event_type']))
(event['timestamp'], event['event_type']))
events = []
# We could have warnings, but a valid event list.
@ -400,10 +417,11 @@ class UsageHandler(PipelineHandlerBase):
warning_event = {'event_type': 'compute.instance.exists.warnings',
'publisher_id': 'stv3',
'message_id': str(uuid.uuid4()),
'timestamp': exists.get('timestamp',
datetime.datetime.utcnow()),
'timestamp': exists.get(
'timestamp',
datetime.datetime.utcnow()),
'stream_id': int(self.stream_id),
'instance_id': exists.get('instance_id'),
'instance_id': instance_id,
'warnings': self.warnings}
events.append(warning_event)
@ -412,7 +430,7 @@ class UsageHandler(PipelineHandlerBase):
'message_id': str(uuid.uuid4()),
'publisher_id': 'stv3',
'timestamp': exists.get('timestamp',
datetime.datetime.utcnow()),
datetime.datetime.utcnow()),
'stream_id': int(self.stream_id),
'error': str(error),
'error_code': error and error.code})
@ -435,16 +453,17 @@ class UsageHandler(PipelineHandlerBase):
# Final block should be empty.
if block:
new_event = {'event_type': "compute.instance.exists.failed",
'message_id': str(uuid.uuid4()),
'timestamp': block[0].get('timestamp',
datetime.datetime.utcnow()),
'stream_id': int(self.stream_id),
'instance_id': block[0].get('instance_id'),
'error': "Notifications, but no .exists "
"notification found.",
'error_code': "U0"
}
new_event = {
'event_type': "compute.instance.exists.failed",
'message_id': str(uuid.uuid4()),
'timestamp': block[0].get('timestamp',
datetime.datetime.utcnow()),
'stream_id': int(self.stream_id),
'instance_id': block[0].get('instance_id'),
'error': "Notifications, but no .exists "
"notification found.",
'error_code': "U0"
}
new_events.append(new_event)
env['usage_notifications'] = new_events

View File

@ -1,11 +1,29 @@
import time
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import simport
import six
import time
from winchester.db import DBInterface, DuplicateError, LockError
from winchester.config import ConfigManager, ConfigSection, ConfigItem
from winchester.config import ConfigItem
from winchester.config import ConfigManager
from winchester.db import DBInterface
from winchester.db import LockError
from winchester.definition import TriggerDefinition
from winchester.models import StreamState
from winchester import time_sync as ts
@ -21,7 +39,8 @@ class PipelineError(Exception):
class PipelineExecutionError(PipelineError):
def __init__(self, msg="", cause=None):
super(PipelineExecutionError, self).__init__("%s: caused by %s" % (msg, repr(cause)))
super(PipelineExecutionError, self).__init__(
"%s: caused by %s" % (msg, repr(cause)))
self.cause = cause
@ -30,17 +49,18 @@ class PipelineConfigError(PipelineError):
class Pipeline(object):
@classmethod
def check_handler_config(cls, conf, handler_map):
if isinstance(conf, six.string_types):
conf = dict(name=conf, params=dict())
if 'name' not in conf:
raise PipelineConfigError("Handler name not in config! %s" % str(conf))
raise PipelineConfigError(
"Handler name not in config! %s" % str(conf))
if 'params' not in conf:
conf['params'] = {}
if conf['name'] not in handler_map:
raise PipelineConfigError("Unknown handler in pipeline config %s" % conf['name'])
raise PipelineConfigError(
"Unknown handler in pipeline config %s" % conf['name'])
return conf
def __init__(self, name, config, handler_map):
@ -54,8 +74,9 @@ class Pipeline(object):
try:
handler = handler_class(**params)
except Exception as e:
logger.exception("Error initalizing handler %s for pipeline %s" %
(handler_class, self.name))
logger.exception(
"Error initalizing handler %s for pipeline %s" %
(handler_class, self.name))
raise PipelineExecutionError("Error loading pipeline", e)
self.handlers.append(handler)
@ -66,11 +87,11 @@ class Pipeline(object):
for handler in self.handlers:
events = handler.handle_events(events, self.env)
debugger.bump_counter("Pre-commit successful")
except Exception as e:
except Exception as err:
logger.exception("Error processing pipeline %s" % self.name)
debugger.bump_counter("Pipeline error")
self.rollback(debugger)
raise PipelineExecutionError("Error in pipeline", e)
raise PipelineExecutionError("Error in pipeline", err)
new_events = [e for e in events if e['message_id'] not in event_ids]
self.commit(debugger)
return new_events
@ -80,56 +101,59 @@ class Pipeline(object):
try:
handler.commit()
debugger.bump_counter("Commit successful")
except:
except Exception:
debugger.bump_counter("Commit error")
logger.exception("Commit error on handler in pipeline %s" % self.name)
logger.exception(
"Commit error on handler in pipeline %s" % self.name)
def rollback(self, debugger):
for handler in self.handlers:
try:
handler.rollback()
debugger.bump_counter("Rollback successful")
except:
except Exception:
debugger.bump_counter("Rollback error")
logger.exception("Rollback error on handler in pipeline %s" % self.name)
logger.exception(
"Rollback error on handler in pipeline %s" % self.name)
class PipelineManager(object):
@classmethod
def config_description(cls):
configs = TriggerManager.config_description()
configs.update(dict(
pipeline_handlers=ConfigItem(required=True,
help="dictionary of pipeline handlers to load "
"Classes specified with simport syntax. "
"simport docs for more info"),
pipeline_worker_batch_size=ConfigItem(
help="Number of streams for pipeline "
"worker(s) to load at a time",
default=1000),
pipeline_worker_delay=ConfigItem(
help="Number of seconds for pipeline worker "
"to sleep when it finds no streams to "
"process", default=10),
pipeline_config=ConfigItem(required=True,
pipeline_handlers=ConfigItem(
required=True,
help="dictionary of pipeline handlers to load "
"Classes specified with simport syntax. "
"simport docs for more info"),
pipeline_worker_batch_size=ConfigItem(
help="Number of streams for pipeline "
"worker(s) to load at a time",
default=1000),
pipeline_worker_delay=ConfigItem(
help="Number of seconds for pipeline worker "
"to sleep when it finds no streams to "
"process", default=10),
pipeline_config=ConfigItem(required=True,
help="Name of pipeline config file "
"defining the handlers for each "
"pipeline."),
purge_completed_streams=ConfigItem(
help="Delete successfully proccessed "
"streams when finished?",
default=True),
))
purge_completed_streams=ConfigItem(
help="Delete successfully proccessed "
"streams when finished?",
default=True),
))
return configs
def __init__(self, config, db=None, pipeline_handlers=None,
pipeline_config=None, trigger_defs=None, time_sync=None,
proc_name='pipeline_worker'):
#name used to distinguish worker processes in logs
# name used to distinguish worker processes in logs
self.proc_name = proc_name
logger.debug("PipelineManager(%s): Using config: %s" % (self.proc_name, str(config)))
logger.debug("PipelineManager(%s): Using config: %s"
% (self.proc_name, str(config)))
config = ConfigManager.wrap(config, self.config_description())
self.config = config
config.check_config()
@ -146,7 +170,8 @@ class PipelineManager(object):
if pipeline_handlers is not None:
self.pipeline_handlers = pipeline_handlers
else:
self.pipeline_handlers = self._load_plugins(config['pipeline_handlers'])
self.pipeline_handlers = self._load_plugins(
config['pipeline_handlers'])
logger.debug("Pipeline handlers: %s" % str(self.pipeline_handlers))
if pipeline_config is not None:
@ -156,21 +181,25 @@ class PipelineManager(object):
logger.debug("Pipeline config: %s" % str(self.pipeline_config))
for pipeline, handler_configs in self.pipeline_config.items():
self.pipeline_config[pipeline] = [Pipeline.check_handler_config(conf,
self.pipeline_handlers)
for conf in handler_configs]
self.pipeline_config[pipeline] = [
Pipeline.check_handler_config(conf,
self.pipeline_handlers)
for conf in handler_configs]
if trigger_defs is not None:
self.trigger_definitions = trigger_defs
else:
defs = config.load_file(config['trigger_definitions'])
logger.debug("Loaded trigger definitions %s" % str(defs))
self.trigger_definitions = [TriggerDefinition(conf, None) for conf in defs]
self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions)
self.trigger_definitions = [TriggerDefinition(conf, None) for conf
in defs]
self.trigger_map = dict(
(tdef.name, tdef) for tdef in self.trigger_definitions)
self.trigger_manager = TriggerManager(self.config, db=self.db,
trigger_defs=self.trigger_definitions,
time_sync=time_sync)
self.trigger_manager = TriggerManager(
self.config, db=self.db,
trigger_defs=self.trigger_definitions,
time_sync=time_sync)
self.pipeline_worker_batch_size = config['pipeline_worker_batch_size']
self.pipeline_worker_delay = config['pipeline_worker_delay']
@ -195,8 +224,8 @@ class PipelineManager(object):
except (simport.MissingMethodOrFunction,
simport.MissingModule,
simport.BadDirectory) as e:
log.error("Could not load plugin %s: Not found. %s" % (
name, e))
logger.error("Could not load plugin %s: Not found. %s" % (
name, e))
return plugins
def current_time(self):
@ -205,7 +234,7 @@ class PipelineManager(object):
def _log_statistics(self):
logger.info("Loaded %s streams. Fired %s, Expired %s." % (
self.streams_loaded, self.streams_fired, self.streams_expired))
self.streams_loaded, self.streams_fired, self.streams_expired))
self.streams_fired = 0
self.streams_expired = 0
self.streams_loaded = 0
@ -222,11 +251,12 @@ class PipelineManager(object):
events = self.db.get_stream_events(stream)
debugger = trigger_def.debugger
try:
pipeline = Pipeline(pipeline_name, pipeline_config, self.pipeline_handlers)
pipeline = Pipeline(pipeline_name, pipeline_config,
self.pipeline_handlers)
new_events = pipeline.handle_events(events, stream, debugger)
except PipelineExecutionError:
logger.error("Exception in pipeline %s handling stream %s" % (
pipeline_name, stream.id))
pipeline_name, stream.id))
return False
if new_events:
self.add_new_events(new_events)
@ -239,8 +269,9 @@ class PipelineManager(object):
try:
self.db.set_stream_state(stream, StreamState.completed)
except LockError:
logger.error("Stream %s locked while trying to set 'complete' state! "
"This should not happen." % stream.id)
logger.error(
"Stream %s locked while trying to set 'complete' state! "
"This should not happen." % stream.id)
def _error_stream(self, stream):
try:
@ -253,8 +284,9 @@ class PipelineManager(object):
try:
self.db.set_stream_state(stream, StreamState.expire_error)
except LockError:
logger.error("Stream %s locked while trying to set 'expire_error' state! "
"This should not happen." % stream.id)
logger.error(
"Stream %s locked while trying to set 'expire_error' state! "
"This should not happen." % stream.id)
def safe_get_debugger(self, trigger_def):
return trigger_def.debugger if trigger_def is not None else \
@ -295,7 +327,7 @@ class PipelineManager(object):
debugger.bump_counter("No fire pipeline for '%s'" % stream.name)
self._complete_stream(stream)
debugger.bump_counter("Streams fired")
self.streams_fired +=1
self.streams_fired += 1
return True
def expire_stream(self, stream):
@ -311,7 +343,7 @@ class PipelineManager(object):
if trigger_def is None:
debugger.bump_counter("Unknown trigger def '%s'" % stream.name)
logger.error("Stream %s has unknown trigger definition %s" % (
stream.id, stream.name))
stream.id, stream.name))
self._expire_error_stream(stream)
return False
pipeline = trigger_def.expire_pipeline
@ -319,8 +351,9 @@ class PipelineManager(object):
pipe_config = self.pipeline_config.get(pipeline)
if pipe_config is None:
debugger.bump_counter("Unknown pipeline '%s'" % pipeline)
logger.error("Trigger %s for stream %s has unknown pipeline %s" % (
stream.name, stream.id, pipeline))
logger.error(
"Trigger %s for stream %s has unknown pipeline %s" % (
stream.name, stream.id, pipeline))
self._expire_error_stream(stream)
if not self._run_pipeline(stream, trigger_def, pipeline,
pipe_config):
@ -328,11 +361,11 @@ class PipelineManager(object):
return False
else:
logger.debug("No expire pipeline for stream %s. Nothing to do." % (
stream.id))
stream.id))
debugger.bump_counter("No expire pipeline for '%s'" % stream.name)
self._complete_stream(stream)
debugger.bump_counter("Streams expired")
self.streams_expired +=1
self.streams_expired += 1
return True
def process_ready_streams(self, batch_size, expire=False):
@ -355,11 +388,14 @@ class PipelineManager(object):
def run(self):
while True:
fire_ct = self.process_ready_streams(self.pipeline_worker_batch_size)
expire_ct = self.process_ready_streams(self.pipeline_worker_batch_size,
expire=True)
fire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size)
expire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size,
expire=True)
if (self.current_time() - self.last_status).seconds > self.statistics_period:
if ((self.current_time() - self.last_status).seconds
> self.statistics_period):
self._log_statistics()
if not fire_ct and not expire_ct:

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import dateutil.parser
import logging

View File

@ -1,11 +1,31 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from stackdistiller import distiller, condenser
import simport
from stackdistiller import condenser
from stackdistiller import distiller
from winchester.config import ConfigManager, ConfigSection, ConfigItem
from winchester.config import ConfigItem
from winchester.config import ConfigManager
from winchester.config import ConfigSection
from winchester.db import DBInterface
from winchester.db import DuplicateError
from winchester import debugging
from winchester.db import DBInterface, DuplicateError
from winchester.definition import TriggerDefinition
from winchester import time_sync as ts
@ -43,10 +63,12 @@ class EventCondenser(condenser.CondenserBase):
def _fix_time(self, dt):
"""Stackdistiller converts all times to utc.
We store timestamps as utc datetime. However, the explicit
UTC timezone on incoming datetimes causes comparison issues
deep in sqlalchemy. We fix this by converting all datetimes
to naive utc timestamps"""
to naive utc timestamps
"""
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
return dt
@ -67,36 +89,40 @@ class TriggerManager(object):
@classmethod
def config_description(cls):
return dict(config_path=ConfigItem(
help="Path(s) to find additional config files",
multiple=True, default='.'),
distiller_config=ConfigItem(required=True,
help="Name of distiller config file "
"describing what to extract from the "
"notifications"),
distiller_trait_plugins=ConfigItem(
help="dictionary of trait plugins to load "
"for stackdistiller. Classes specified with "
"simport syntax. See stackdistiller and "
"simport docs for more info", default=dict()),
time_sync_endpoint=ConfigItem(
help="URL of time sync service for use with"
" replying old events.",
default=None),
catch_all_notifications=ConfigItem(
help="Store basic info for all notifications,"
" even if not listed in distiller config",
default=False),
statistics_period=ConfigItem(
help="Emit stats on event counts, etc every "
"this many seconds", default=10),
database=ConfigSection(help="Database connection info.",
config_description=DBInterface.config_description()),
trigger_definitions=ConfigItem(required=True,
help="Name of trigger definitions file "
"defining trigger conditions and what events to "
"process for each stream"),
)
return dict(
config_path=ConfigItem(
help="Path(s) to find additional config files",
multiple=True, default='.'),
distiller_config=ConfigItem(
required=True,
help="Name of distiller config file "
"describing what to extract from the "
"notifications"),
distiller_trait_plugins=ConfigItem(
help="dictionary of trait plugins to load "
"for stackdistiller. Classes specified with "
"simport syntax. See stackdistiller and "
"simport docs for more info", default=dict()),
time_sync_endpoint=ConfigItem(
help="URL of time sync service for use with"
" replying old events.",
default=None),
catch_all_notifications=ConfigItem(
help="Store basic info for all notifications,"
" even if not listed in distiller config",
default=False),
statistics_period=ConfigItem(
help="Emit stats on event counts, etc every "
"this many seconds", default=10),
database=ConfigSection(
help="Database connection info.",
config_description=DBInterface.config_description()),
trigger_definitions=ConfigItem(
required=True,
help="Name of trigger definitions file "
"defining trigger conditions and what events to "
"process for each stream"),
)
def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None,
time_sync=None):
@ -119,9 +145,10 @@ class TriggerManager(object):
dist_config = config.load_file(config['distiller_config'])
plugmap = self._load_plugins(config['distiller_trait_plugins'],
distiller.DEFAULT_PLUGINMAP)
self.distiller = distiller.Distiller(dist_config,
trait_plugin_map=plugmap,
catchall=config['catch_all_notifications'])
self.distiller = distiller.Distiller(
dist_config,
trait_plugin_map=plugmap,
catchall=config['catch_all_notifications'])
if trigger_defs is not None:
self.trigger_definitions = trigger_defs
for t in self.trigger_definitions:
@ -129,8 +156,8 @@ class TriggerManager(object):
else:
defs = config.load_file(config['trigger_definitions'])
self.trigger_definitions = [TriggerDefinition(conf,
self.debug_manager)
for conf in defs]
self.debug_manager)
for conf in defs]
self.saved_events = 0
self.received = 0
self.last_status = self.current_time()
@ -144,13 +171,13 @@ class TriggerManager(object):
try:
plugins[name] = simport.load(cls_string)
except simport.ImportFailed as e:
log.error("Could not load plugin %s: Import failed. %s" % (
name, e))
logger.error("Could not load plugin %s: Import failed. %s" % (
name, e))
except (simport.MissingMethodOrFunction,
simport.MissingModule,
simport.BadDirectory) as e:
log.error("Could not load plugin %s: Not found. %s" % (
name, e))
logger.error("Could not load plugin %s: Not found. %s" % (
name, e))
return plugins
def current_time(self):
@ -185,9 +212,11 @@ class TriggerManager(object):
else:
logger.warning("Received invalid event")
else:
event_type = notification_body.get('event_type', '**no event_type**')
event_type = notification_body.get('event_type',
'**no event_type**')
message_id = notification_body.get('message_id', '**no id**')
logger.info("Dropping unconverted %s notification %s" % (event_type, message_id))
logger.info("Dropping unconverted %s notification %s"
% (event_type, message_id))
return None
def _log_statistics(self):
@ -200,13 +229,15 @@ class TriggerManager(object):
self.debug_manager.dump_debuggers()
def _add_or_create_stream(self, trigger_def, event, dist_traits):
stream = self.db.get_active_stream(trigger_def.name, dist_traits, self.current_time())
stream = self.db.get_active_stream(trigger_def.name, dist_traits,
self.current_time())
if stream is None:
trigger_def.debugger.bump_counter("New stream")
stream = self.db.create_stream(trigger_def.name, event, dist_traits,
stream = self.db.create_stream(trigger_def.name, event,
dist_traits,
trigger_def.expiration)
logger.debug("Created New stream %s for %s: distinguished by %s" % (
stream.id, trigger_def.name, str(dist_traits)))
logger.debug("Created New stream %s for %s: distinguished by %s"
% (stream.id, trigger_def.name, str(dist_traits)))
else:
self.db.add_event_stream(stream, event, trigger_def.expiration)
return stream
@ -216,7 +247,7 @@ class TriggerManager(object):
self.db.stream_ready_to_fire(stream, timestamp)
trigger_def.debugger.bump_counter("Ready to fire")
logger.debug("Stream %s ready to fire at %s" % (
stream.id, timestamp))
stream.id, timestamp))
def add_event(self, event):
if self.save_event(event):
@ -224,13 +255,13 @@ class TriggerManager(object):
matched_criteria = trigger_def.match(event)
if matched_criteria:
dist_traits = trigger_def.get_distinguishing_traits(
event, matched_criteria)
event, matched_criteria)
stream = self._add_or_create_stream(trigger_def, event,
dist_traits)
trigger_def.debugger.bump_counter("Added events")
if stream.fire_timestamp is None:
if trigger_def.should_fire(self.db.get_stream_events(
stream)):
stream)):
self._ready_to_fire(stream, trigger_def)
def add_notification(self, notification_body):

View File

@ -1,3 +1,19 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import daemon
import logging
@ -48,7 +64,7 @@ def main():
timesync = time_sync.TimeSync(conf)
pipe = PipelineManager(conf, time_sync=timesync, proc_name=proc_name)
if args.daemon:
print "Backgrounding for daemon mode."
print("Backgrounding for daemon mode.")
with daemon.DaemonContext():
pipe.run()
else:

View File

@ -1,11 +1,27 @@
# Copyright (c) 2014 Dark Secret Software Inc.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from yagi.handler import BaseHandler
import yagi.config
from yagi.handler import BaseHandler
from winchester.trigger_manager import TriggerManager
from winchester.config import ConfigManager
from winchester import time_sync
from winchester.trigger_manager import TriggerManager
logger = logging.getLogger(__name__)