diff --git a/.gitignore b/.gitignore index a735f8b..d832fc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ *.py[cod] +AUTHORS +Changelog + # C extensions *.so diff --git a/AUTHORS b/AUTHORS deleted file mode 100644 index 9ee0aa5..0000000 --- a/AUTHORS +++ /dev/null @@ -1 +0,0 @@ -Monsyne Dragon diff --git a/ChangeLog b/ChangeLog deleted file mode 100644 index 0c18624..0000000 --- a/ChangeLog +++ /dev/null @@ -1,62 +0,0 @@ -commit 2d47fa6f6e0de0a54975ff92fc87785a052d4371 -Author: Monsyne Dragon -Date: Mon Sep 8 23:02:52 2014 +0000 - - Add reset stream method. - -commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5 -Author: Monsyne Dragon -Date: Mon Sep 8 19:57:24 2014 +0000 - - Save newly generated events from pipeline - - Save newly created events from pipeline run if pipeline commits. - Refactor trigger manager api wart, move save_event call into add_event - to make add_event and add_notification symmetric. - -commit 0c619c133d3c248d62a2c5f6441d4fae0bf7042a -Author: Monsyne Dragon -Date: Sun Sep 7 04:07:20 2014 +0000 - - Add database admin command. - - Add admin command for db schema upgrade/downgrade/etc. - Move alembic migrations so above can find them when installed - as a package. - Fix up packaging to use setup.cfg and pbr. - Flesh out README. - -commit a6f84d16036e143b1b605c50b90055a623e3235b -Author: Monsyne Dragon -Date: Thu Sep 4 20:43:41 2014 +0000 - - Fixed a few bugs, added more logging. - - Fixed timestamp bug, and streamstate issue missed in unittests. - Added more logging for pipeline manager. - -commit c2aa498beb14cf0a61066fe1e7df833a16db5733 -Author: Monsyne Dragon -Date: Thu Sep 4 18:05:19 2014 +0000 - - Move yagi handler into winchester codebase. - -commit a8f373e4bf14762ad09a20f8ad9ea543e11c5be7 -Author: Monsyne Dragon -Date: Thu Sep 4 01:49:19 2014 +0000 - - Added full stream processing, pipeline workers, etc. - - Full trigger logic now works. - Added pipeline workers, and test handler. - Added example configs - Lots of unittests. - -commit aa8fb55e879e782268c663f81e73384673d56847 -Author: Monsyne Dragon -Date: Thu Jun 26 01:55:26 2014 +0000 - - Initial commit of DB schema. - - Initial commit of the event schema for the database. - This includes models and alembic migration. \ No newline at end of file diff --git a/etc/triggers.yaml b/etc/triggers.yaml index 6c82d5a..f415bfb 100644 --- a/etc/triggers.yaml +++ b/etc/triggers.yaml @@ -1,5 +1,6 @@ --- - name: test_trigger + debug_level: 2 distinguished_by: - instance_id - timestamp: "day" diff --git a/tests/test_debugger.py b/tests/test_debugger.py new file mode 100644 index 0000000..baaf8a9 --- /dev/null +++ b/tests/test_debugger.py @@ -0,0 +1,219 @@ +import unittest2 as unittest + +import mock + +from winchester import debugging + + +class TestDebugManager(unittest.TestCase): + + def setUp(self): + super(TestDebugManager, self).setUp() + self.debug_manager = debugging.DebugManager() + + def test_get_debugger_none(self): + debugger = self.debug_manager.get_debugger(None) + self.assertEquals("n/a", debugger._name) + self.assertEquals(2, debugger._debug_level) + + def test_get_debugger_off(self): + tdef = mock.MagicMock(name="tdef") + tdef.name = "my_trigger" + tdef.debug_level = 0 + debugger = self.debug_manager.get_debugger(tdef) + self.assertTrue(isinstance(debugger, debugging.NoOpDebugger)) + self.assertEquals(debugger, + self.debug_manager._debuggers['my_trigger']) + + debugger2 = self.debug_manager.get_debugger(tdef) + self.assertEquals(debugger, debugger2) + + def test_get_debugger_on(self): + tdef = mock.MagicMock(name="tdef") + tdef.name = "my_trigger" + tdef.debug_level = 1 + debugger = self.debug_manager.get_debugger(tdef) + self.assertTrue(isinstance(debugger, debugging.DetailedDebugger)) + self.assertEquals(debugger, + self.debug_manager._debuggers['my_trigger']) + + debugger2 = self.debug_manager.get_debugger(tdef) + self.assertEquals(debugger, debugger2) + + def test_dump_group_level1(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 1 + group = mock.MagicMock(name="group") + group._name = "my_group" + group._match = 1 + group._mismatch = 2 + debugger.get_group.return_value = group + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_group(debugger, "my_group") + + log.info.assert_called_once_with( + "my_group Criteria: 3 checks, 1 passed") + + def test_dump_group_level2(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 2 + group = mock.MagicMock(name="group") + group._name = "my_group" + group._match = 1 + group._mismatch = 2 + group._reasons = {"foo": 12} + debugger.get_group.return_value = group + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_group(debugger, "my_group") + + self.assertEquals(log.info.call_args_list, + [mock.call("my_group Criteria: 3 checks, 1 passed"), + mock.call(" - foo = 12")]) + + def test_dump_counters(self): + debugger = mock.MagicMock(name="debugger") + debugger._counters = {'foo': 12} + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_counters(debugger) + log.info.assert_called_once_with('Counter "foo" = 12') + + def test_dump_debuggers_off(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 0 + self.debug_manager._debuggers = {"foo": debugger} + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_debuggers() + self.assertEqual(0, log.info.call_count) + + def test_dump_debuggers_on(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 1 + debugger._name = "my_debugger" + group = mock.MagicMock(name="group") + debugger._groups = {"my_group": group} + self.debug_manager._debuggers = {"foo": debugger} + with mock.patch.object(self.debug_manager, "dump_counters") as ctr: + with mock.patch.object(self.debug_manager, "dump_group") as grp: + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_debuggers() + self.assertEquals(log.info.call_args_list, + [mock.call("---- Trigger Definition: my_debugger ----"), + mock.call("----------------------------")]) + grp.assert_called_once_with(debugger, "my_group") + ctr.assert_called_once_with(debugger) + debugger.reset.assert_called_once_with() + + +class TestDetailedDebugger(unittest.TestCase): + + def setUp(self): + super(TestDetailedDebugger, self).setUp() + self.debugger = debugging.DetailedDebugger("my_debugger", 2) + + def test_constructor(self): + with mock.patch("winchester.debugging.DetailedDebugger.reset") \ + as reset: + d = debugging.DetailedDebugger("my_debugger", 2) + reset.assert_called_once_with() + + self.assertEquals(self.debugger._name, "my_debugger") + self.assertEquals(self.debugger._debug_level, 2) + + def test_reset(self): + self.assertEquals(self.debugger._groups, {}) + self.assertEquals(self.debugger._counters, {}) + + def test_get_group(self): + self.assertEquals(self.debugger._groups, {}) + g = self.debugger.get_group("foo") + self.assertEquals(g._name, "foo") + self.assertTrue(self.debugger._groups['foo']) + + def test_bump_counter(self): + self.assertEquals(self.debugger._counters, {}) + self.debugger.bump_counter("foo") + self.assertEquals(self.debugger._counters['foo'], 1) + + self.debugger.bump_counter("foo", 2) + self.assertEquals(self.debugger._counters['foo'], 3) + + def test_get_debug_level(self): + self.assertEquals(self.debugger.get_debug_level(), 2) + + +class TestNoOpDebugger(unittest.TestCase): + def setUp(self): + super(TestNoOpDebugger, self).setUp() + self.debugger = debugging.NoOpDebugger("my_debugger", 2) + + def test_reset(self): + self.debugger.reset() + + def test_get_group(self): + g = self.debugger.get_group("foo") + self.assertEquals(g, self.debugger.noop_group) + + def test_bump_counter(self): + self.debugger.bump_counter("foo") + self.debugger.bump_counter("foo", 2) + + def test_get_debug_level(self): + self.assertEquals(self.debugger.get_debug_level(), 0) + + +class TestGroup(unittest.TestCase): + def setUp(self): + super(TestGroup, self).setUp() + self.group = debugging.Group("my_group") + + def test_constructor(self): + self.assertEquals("my_group", self.group._name) + self.assertEquals(0, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + def test_match(self): + self.assertTrue(self.group.match()) + self.assertEquals(1, self.group._match) + + def test_mismatch(self): + self.assertFalse(self.group.mismatch("reason")) + self.assertEquals(1, self.group._mismatch) + self.assertEquals(1, self.group._reasons['reason']) + + def test_check(self): + self.assertTrue(self.group.check(True, "reason")) + self.assertEquals(1, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + self.assertTrue(self.group.check(True, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + self.assertFalse(self.group.check(False, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(1, self.group._mismatch) + self.assertEquals(1, self.group._reasons['reason']) + + self.assertFalse(self.group.check(False, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(2, self.group._mismatch) + self.assertEquals(2, self.group._reasons['reason']) + + +class TestNoOpGroup(unittest.TestCase): + def setUp(self): + super(TestNoOpGroup, self).setUp() + self.group = debugging.NoOpGroup() + + def test_match(self): + self.assertTrue(self.group.match()) + + def test_mismatch(self): + self.assertFalse(self.group.mismatch("reason")) + + def test_check(self): + self.assertTrue(self.group.check(True, "reason")) + self.assertFalse(self.group.check(False, "reason")) diff --git a/tests/test_definition.py b/tests/test_definition.py index d6b5e55..846dfe9 100644 --- a/tests/test_definition.py +++ b/tests/test_definition.py @@ -6,6 +6,7 @@ import mock import datetime import timex +from winchester import debugging from winchester import definition @@ -13,27 +14,29 @@ class TestCriterion(unittest.TestCase): def setUp(self): super(TestCriterion, self).setUp() + self.fake_group = debugging.NoOpGroup() + self.fake_debugger = debugging.NoOpDebugger() def test_basic_criterion(self): c = definition.Criterion(3, 'foo') - self.assertTrue(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertFalse(c.match({'bar': 5})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertFalse(c.match({'bar': 5}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) def test_numeric_criterion(self): c = definition.NumericCriterion("3", 'foo') - self.assertTrue(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertFalse(c.match({'bar': 5})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertFalse(c.match({'bar': 5}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) c = definition.NumericCriterion("> 3", 'foo') - self.assertFalse(c.match({'foo': 3})) - self.assertTrue(c.match({'foo': 5})) + self.assertFalse(c.match({'foo': 3}, self.fake_group)) + self.assertTrue(c.match({'foo': 5}, self.fake_group)) c = definition.NumericCriterion("< 3", 'foo') - self.assertFalse(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertTrue(c.match({'foo': 1})) + self.assertFalse(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertTrue(c.match({'foo': 1}, self.fake_group)) with self.assertRaises(definition.DefinitionError): c = definition.NumericCriterion("zazz", "foo") with self.assertRaises(definition.DefinitionError): @@ -41,17 +44,17 @@ class TestCriterion(unittest.TestCase): def test_float_criterion(self): c = definition.FloatCriterion("3.14", 'foo') - self.assertTrue(c.match({'foo': 3.14})) - self.assertFalse(c.match({'foo': 5.2})) - self.assertFalse(c.match({'bar': 5.2})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3.14}, self.fake_group)) + self.assertFalse(c.match({'foo': 5.2}, self.fake_group)) + self.assertFalse(c.match({'bar': 5.2}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) c = definition.FloatCriterion("> 3.14", 'foo') - self.assertFalse(c.match({'foo': 3.14})) - self.assertTrue(c.match({'foo': 5.2})) + self.assertFalse(c.match({'foo': 3.14}, self.fake_group)) + self.assertTrue(c.match({'foo': 5.2}, self.fake_group)) c = definition.FloatCriterion("< 3.14", 'foo') - self.assertFalse(c.match({'foo': 3.14})) - self.assertFalse(c.match({'foo': 3.5})) - self.assertTrue(c.match({'foo': 3.02})) + self.assertFalse(c.match({'foo': 3.14}, self.fake_group)) + self.assertFalse(c.match({'foo': 3.5}, self.fake_group)) + self.assertTrue(c.match({'foo': 3.02}, self.fake_group)) with self.assertRaises(definition.DefinitionError): c = definition.FloatCriterion("zazz", "foo") with self.assertRaises(definition.DefinitionError): @@ -61,25 +64,26 @@ class TestCriterion(unittest.TestCase): c = definition.TimeCriterion("day", "foo") e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), foo=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertTrue(c.match(e)) + self.assertTrue(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), foo=datetime.datetime(2014,8,2,1,2,0,0)) - self.assertFalse(c.match(e)) + self.assertFalse(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), bar=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertFalse(c.match(e)) + self.assertFalse(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), message_id='1234-5678', quux=4, foo=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertTrue(c.match(e)) - + self.assertTrue(c.match(e, self.fake_group)) class TestCriteria(unittest.TestCase): def setUp(self): super(TestCriteria, self).setUp() + self.fake_group = debugging.NoOpGroup() + self.fake_debugger = debugging.NoOpDebugger() def test_defaults(self): criteria = definition.Criteria({}) @@ -128,9 +132,9 @@ class TestCriteria(unittest.TestCase): event1 = dict(event_type = "test.foo.zazz") event2 = dict(event_type = "test.wakka.zazz") event3 = dict(event_type = "test.boingy") - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) def test_match_for_timestamp(self): config = dict(timestamp='day($launched_at)') @@ -143,9 +147,9 @@ class TestCriteria(unittest.TestCase): launched_at=datetime.datetime(2014,8,1,1,2,3,4)) event3 = dict(event_type='test.thing', timestamp=datetime.datetime(2014,8,2,17,16,15,14)) - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) def test_match_for_traits(self): config = dict(traits=dict(some_trait="test", @@ -195,42 +199,49 @@ class TestCriteria(unittest.TestCase): other_trait='text here', memory_mb=4096, test_weight=6.283) - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) - self.assertFalse(criteria.match(event4)) - self.assertFalse(criteria.match(event5)) - self.assertFalse(criteria.match(event6)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) + self.assertFalse(criteria.match(event4, self.fake_group)) + self.assertFalse(criteria.match(event5, self.fake_group)) + self.assertFalse(criteria.match(event6, self.fake_group)) + class TestTriggerDefinition(unittest.TestCase): def setUp(self): super(TestTriggerDefinition, self).setUp() + self.debug_manager = debugging.DebugManager() def test_config_error_check_and_defaults(self): with self.assertRaises(definition.DefinitionError): - definition.TriggerDefinition(dict()) + definition.TriggerDefinition(dict(), self.debug_manager) with self.assertRaises(definition.DefinitionError): - definition.TriggerDefinition(dict(name='test_trigger')) + definition.TriggerDefinition(dict(name='test_trigger'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition(dict(name='test_trigger', - expiration='$last + 1d')) + expiration='$last + 1d'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition(dict(name='test_trigger', expiration='$last + 1d', - fire_pipeline='test_pipeline')) + fire_pipeline='test_pipeline'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition( dict(name='test_trigger', expiration='$last + 1d', fire_pipeline='test_pipeline', - fire_criteria=[dict(event_type='test.thing')])) + fire_criteria=[dict(event_type='test.thing')]), + self.debug_manager) tdef = definition.TriggerDefinition( dict(name='test_trigger', expiration='$last + 1d', fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], - match_criteria=[dict(event_type='test.*')])) + match_criteria=[dict(event_type='test.*')]), + self.debug_manager) self.assertEqual(len(tdef.distinguished_by), 0) self.assertEqual(len(tdef.fire_criteria), 1) self.assertIsInstance(tdef.fire_criteria[0], definition.Criteria) @@ -245,7 +256,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) event1 = dict(event_type='test.thing') event2 = dict(event_type='other.thing') self.assertTrue(tdef.match(event1)) @@ -256,7 +267,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*'), dict(event_type='other.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) self.assertTrue(tdef.match(event1)) self.assertTrue(tdef.match(event2)) @@ -267,7 +278,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) event1 = dict(event_type='test.thing', instance_id='foo') event2 = dict(event_type='test.thing') self.assertTrue(tdef.match(event1)) @@ -281,7 +292,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) event1 = dict(event_type='test.thing', instance_id='foo') - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 1) @@ -297,7 +308,7 @@ class TestTriggerDefinition(unittest.TestCase): match_criteria=[dict(event_type='test.*')]) event1 = dict(event_type='test.thing', instance_id='foo', timestamp=datetime.datetime(2014,8,1,20,4,23,444)) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 2) @@ -320,7 +331,7 @@ class TestTriggerDefinition(unittest.TestCase): map_distinguished_by=dict(instance_id='other_id'))]) event1 = dict(event_type='test.thing', instance_id='foo', other_id='bar') - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 1) @@ -334,7 +345,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) test_time = datetime.datetime(2014,8,1,20,4,23,444) test_time_plus_1hr = datetime.datetime(2014,8,1,21,4,23,444) ft = tdef.get_fire_timestamp(test_time) @@ -346,7 +357,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) ft = tdef.get_fire_timestamp(test_time) self.assertEqual(ft, test_time_plus_1hr) @@ -357,7 +368,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) events1 = [ dict(event_type='test.foobar'), dict(event_type='test.thing'), dict(event_type='test.thing')] @@ -374,7 +385,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing', number=2)], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) self.assertTrue(tdef.should_fire(events1)) self.assertFalse(tdef.should_fire(events2)) self.assertFalse(tdef.should_fire(events3)) diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py index 77889bc..58071bf 100644 --- a/tests/test_pipeline_manager.py +++ b/tests/test_pipeline_manager.py @@ -5,12 +5,16 @@ import mock import datetime import timex -from winchester import pipeline_manager +from winchester import debugging from winchester import db as winch_db +from winchester import pipeline_manager from winchester.models import StreamState class TestPipeline(unittest.TestCase): + def setUp(self): + super(TestPipeline, self).setUp() + self.debugger = debugging.NoOpDebugger() def test_check_handler_config(self): @@ -78,13 +82,13 @@ class TestPipeline(unittest.TestCase): p.commit = mock.MagicMock(name='commit') p.rollback = mock.MagicMock(name='rollback') - ret = p.handle_events(test_events) + ret = p.handle_events(test_events, self.debugger) handler_class1.return_value.handle_events.assert_called_once_with(test_events, p.env) events1 = handler_class1.return_value.handle_events.return_value handler_class2.return_value.handle_events.assert_called_once_with(events1, p.env) events2 = handler_class2.return_value.handle_events.return_value handler_class3.return_value.handle_events.assert_called_once_with(events2, p.env) - p.commit.assert_called_once_with() + p.commit.assert_called_once_with(self.debugger) self.assertFalse(p.rollback.called) self.assertEqual(ret, new_events) @@ -112,8 +116,8 @@ class TestPipeline(unittest.TestCase): p.rollback = mock.MagicMock(name='rollback') with self.assertRaises(pipeline_manager.PipelineExecutionError): - ret = p.handle_events(test_events) - p.rollback.assert_called_once_with() + p.handle_events(test_events, self.debugger) + p.rollback.assert_called_once_with(self.debugger) self.assertFalse(p.commit.called) def test_commit(self): @@ -128,7 +132,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.commit() + p.commit(self.debugger) handler_class1.return_value.commit.assert_called_once_with() handler_class2.return_value.commit.assert_called_once_with() handler_class3.return_value.commit.assert_called_once_with() @@ -150,7 +154,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.commit() + p.commit(self.debugger) handler_class1.return_value.commit.assert_called_once_with() handler_class2.return_value.commit.assert_called_once_with() handler_class3.return_value.commit.assert_called_once_with() @@ -167,7 +171,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.rollback() + p.rollback(self.debugger) handler_class1.return_value.rollback.assert_called_once_with() handler_class2.return_value.rollback.assert_called_once_with() handler_class3.return_value.rollback.assert_called_once_with() @@ -189,7 +193,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.rollback() + p.rollback(self.debugger) handler_class1.return_value.rollback.assert_called_once_with() handler_class2.return_value.rollback.assert_called_once_with() handler_class3.return_value.rollback.assert_called_once_with() @@ -199,6 +203,7 @@ class TestPipelineManager(unittest.TestCase): def setUp(self): super(TestPipelineManager, self).setUp() + self.debugger = debugging.NoOpDebugger() @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') def test_complete_stream_nopurge(self, mock_config_wrap): @@ -240,19 +245,23 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.debugger = self.debugger pipeline_name = "test" pipeline_config = mock.MagicMock(name='pipeline_config') stream = mock.MagicMock(name='stream') - pm.add_new_events = mock.MagicMock(name='add_nemw_events') + stream.name = "test" + pm.add_new_events = mock.MagicMock(name='add_new_events') pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') - ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, + pipeline_config) pm.db.get_stream_events.assert_called_once_with(stream) - mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, + pm.pipeline_handlers) pipeline = mock_pipeline.return_value pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value) + pm.db.get_stream_events.return_value, self.debugger) pm.add_new_events.assert_called_once_with( mock_pipeline.return_value.handle_events.return_value) self.assertTrue(ret) @@ -263,21 +272,26 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.debugger = self.debugger pipeline_name = "test" pipeline_config = mock.MagicMock(name='pipeline_config') stream = mock.MagicMock(name='stream') + stream.name = "test" pm.add_new_events = mock.MagicMock(name='add_nemw_events') pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') pipeline = mock_pipeline.return_value - pipeline.handle_events.side_effect = pipeline_manager.PipelineExecutionError('test', 'thing') + pipeline.handle_events.side_effect = \ + pipeline_manager.PipelineExecutionError('test', 'thing') - ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, + pipeline_config) pm.db.get_stream_events.assert_called_once_with(stream) - mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, + pm.pipeline_handlers) pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value) + pm.db.get_stream_events.return_value, self.debugger) self.assertFalse(pm.add_new_events.called) self.assertFalse(ret) @@ -298,9 +312,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._error_stream.called) pm._complete_stream.assert_called_once_with(stream) self.assertTrue(ret) @@ -344,8 +360,9 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) self.assertFalse(pm._error_stream.called) self.assertFalse(pm._run_pipeline.called) pm._complete_stream.assert_called_once_with(stream) @@ -368,9 +385,12 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = False - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', + pipeline_config) self.assertFalse(pm._complete_stream.called) pm._error_stream.assert_called_once_with(stream) self.assertFalse(ret) @@ -392,9 +412,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._error_stream.called) pm._complete_stream.assert_called_once_with(stream) self.assertTrue(ret) @@ -438,8 +460,9 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) self.assertFalse(pm._expire_error_stream.called) self.assertFalse(pm._run_pipeline.called) pm._complete_stream.assert_called_once_with(stream) @@ -462,9 +485,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = False - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._complete_stream.called) pm._expire_error_stream.assert_called_once_with(stream) self.assertFalse(ret) @@ -474,13 +499,17 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') stream = mock.MagicMock(name='stream') + stream.name = "my_stream" + tdef = mock.MagicMock(name='tdef') + pm.trigger_map['my_stream'] = tdef pm.expire_stream = mock.MagicMock(name='expire_stream') pm.fire_stream = mock.MagicMock(name='fire_stream') pm.current_time = mock.MagicMock(name='current_time') pm.db.get_ready_streams.return_value = [stream] ret = pm.process_ready_streams(42) - pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=False) + pm.db.get_ready_streams.assert_called_once_with(42, + pm.current_time.return_value, expire=False) pm.fire_stream.assert_called_once_with(stream) self.assertFalse(pm.expire_stream.called) self.assertEqual(ret, 1) @@ -490,13 +519,26 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') stream = mock.MagicMock(name='stream') + stream.name = "my_stream" pm.expire_stream = mock.MagicMock(name='expire_stream') pm.fire_stream = mock.MagicMock(name='fire_stream') pm.current_time = mock.MagicMock(name='current_time') pm.db.get_ready_streams.return_value = [stream] ret = pm.process_ready_streams(42, expire=True) - pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=True) + pm.db.get_ready_streams.assert_called_once_with(42, + pm.current_time.return_value, expire=True) pm.expire_stream.assert_called_once_with(stream) self.assertFalse(pm.fire_stream.called) self.assertEqual(ret, 1) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_safe_get_debugger(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + tdef = mock.MagicMock(name="tdef") + tdef.name = "my trigger" + tdef.debugger = self.debugger + self.assertEqual(pm.safe_get_debugger(tdef), self.debugger) + + self.assertEqual(pm.safe_get_debugger(None)._name, "n/a") + diff --git a/tests/test_trigger_manager.py b/tests/test_trigger_manager.py index d5c02cc..e91c50b 100644 --- a/tests/test_trigger_manager.py +++ b/tests/test_trigger_manager.py @@ -5,14 +5,17 @@ import mock import datetime import timex -from winchester import trigger_manager -from winchester import definition from winchester import db as winch_db +from winchester import debugging +from winchester import definition +from winchester import trigger_manager + class TestTriggerManager(unittest.TestCase): def setUp(self): super(TestTriggerManager, self).setUp() + self.debugger = debugging.NoOpDebugger() @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_save_event(self, mock_config_wrap): @@ -25,7 +28,8 @@ class TestTriggerManager(unittest.TestCase): other_test_trait=42) self.assertTrue(tm.save_event(event)) tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', - datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + datetime.datetime(2014,8,1,10,9,8,77777), + dict(test_trait='foobar', other_test_trait=42)) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_save_event_dup(self, mock_config_wrap): @@ -39,7 +43,8 @@ class TestTriggerManager(unittest.TestCase): other_test_trait=42) self.assertFalse(tm.save_event(event)) tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', - datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + datetime.datetime(2014,8,1,10,9,8,77777), + dict(test_trait='foobar', other_test_trait=42)) @mock.patch('winchester.trigger_manager.EventCondenser', autospec=True) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') @@ -119,11 +124,12 @@ class TestTriggerManager(unittest.TestCase): event = "eventful!" ret = tm._add_or_create_stream(trigger_def, event, dist_traits) - tm.db.get_active_stream.assert_called_once_with(trigger_def.name, dist_traits, - tm.current_time.return_value) + tm.db.get_active_stream.assert_called_once_with(trigger_def.name, + dist_traits, tm.current_time.return_value) self.assertFalse(tm.db.create_stream.called) - tm.db.add_event_stream.assert_called_once_with(tm.db.get_active_stream.return_value, - event, trigger_def.expiration) + tm.db.add_event_stream.assert_called_once_with( + tm.db.get_active_stream.return_value, + event, trigger_def.expiration) self.assertEqual(ret, tm.db.get_active_stream.return_value) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') @@ -183,6 +189,8 @@ class TestTriggerManager(unittest.TestCase): tm = trigger_manager.TriggerManager('test') tm.db = mock.MagicMock(spec=tm.db) tm.trigger_definitions = [mock.MagicMock() for n in range(3)] + for d in tm.trigger_definitions: + d.debugger = self.debugger m_def = tm.trigger_definitions[2] tm.trigger_definitions[0].match.return_value = None tm.trigger_definitions[1].match.return_value = None @@ -197,12 +205,16 @@ class TestTriggerManager(unittest.TestCase): tm.save_event.assert_called_once_with(event) for td in tm.trigger_definitions: td.match.assert_called_once_with(event) - m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + m_def.get_distinguishing_traits.assert_called_once_with(event, + m_def.match.return_value) tm._add_or_create_stream.assert_called_once_with(m_def, event, m_def.get_distinguishing_traits.return_value) - tm.db.get_stream_events.assert_called_once_with(tm._add_or_create_stream.return_value) - m_def.should_fire.assert_called_once_with(tm.db.get_stream_events.return_value) - tm._ready_to_fire.assert_called_once_with(tm._add_or_create_stream.return_value, m_def) + tm.db.get_stream_events.assert_called_once_with( + tm._add_or_create_stream.return_value) + m_def.should_fire.assert_called_once_with( + tm.db.get_stream_events.return_value) + tm._ready_to_fire.assert_called_once_with( + tm._add_or_create_stream.return_value, m_def) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_add_event_on_ready_stream(self, mock_config_wrap): @@ -223,7 +235,8 @@ class TestTriggerManager(unittest.TestCase): tm.save_event.assert_called_once_with(event) for td in tm.trigger_definitions: td.match.assert_called_once_with(event) - m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + m_def.get_distinguishing_traits.assert_called_once_with(event, + m_def.match.return_value) tm._add_or_create_stream.assert_called_once_with(m_def, event, m_def.get_distinguishing_traits.return_value) self.assertFalse(tm.db.get_stream_events.called) @@ -254,5 +267,3 @@ class TestTriggerManager(unittest.TestCase): self.assertFalse(tm._add_or_create_stream.called) self.assertFalse(tm.db.get_stream_events.called) self.assertFalse(tm._ready_to_fire.called) - - diff --git a/winchester/debugging.py b/winchester/debugging.py new file mode 100644 index 0000000..9ea1f91 --- /dev/null +++ b/winchester/debugging.py @@ -0,0 +1,161 @@ +# Copyright (c) 2014 Dark Secret Software Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import six + +logger = logging.getLogger(__name__) + + +class NoOpGroup(object): + def match(self): + return True + + def mismatch(self, reason): + return False + + def check(self, value, reason): + return value + + +class Group(object): + def __init__(self, name): + self._name = name # Group name + self._match = 0 + self._mismatch = 0 + self._reasons = {} + + def match(self): + self._match += 1 + return True + + def mismatch(self, reason): + count = self._reasons.get(reason, 0) + self._reasons[reason] = count + 1 + self._mismatch += 1 + return False + + def check(self, value, reason): + if value: + return self.match() + return self.mismatch(reason) + + +@six.add_metaclass(abc.ABCMeta) +class BaseDebugger(object): + @abc.abstractmethod + def reset(self): + pass + + @abc.abstractmethod + def get_group(self, name): + pass + + @abc.abstractmethod + def bump_counter(self, name, inc=1): + pass + + @abc.abstractmethod + def get_debug_level(self): + pass + + +class NoOpDebugger(BaseDebugger): + def __init__(self, *args, **kwargs): + self.noop_group = NoOpGroup() + + def reset(self): + pass + + def get_group(self, name): + return self.noop_group + + def bump_counter(self, name, inc=1): + pass + + def get_debug_level(self): + return 0 + + +class DetailedDebugger(BaseDebugger): + def __init__(self, name, debug_level): + super(DetailedDebugger, self).__init__() + self._name = name + self._debug_level = debug_level + self.reset() + + def reset(self): + # If it's not a match or a mismatch it was a fatal error. + self._groups = {} + self._counters = {} + + def get_group(self, name): + group = self._groups.get(name, Group(name)) + self._groups[name] = group + return group + + def bump_counter(self, name, inc=1): + self._counters[name] = self._counters.get(name, 0) + inc + + def get_debug_level(self): + return self._debug_level + + +class DebugManager(object): + def __init__(self): + self._debuggers = {} + + def get_debugger(self, trigger_def): + name = "n/a" + level = 2 # Default these unknowns to full debug. + if trigger_def is not None: + name = trigger_def.name + level = trigger_def.debug_level + debugger = self._debuggers.get(name) + if not debugger: + if level > 0: + debugger = DetailedDebugger(name, level) + else: + debugger = NoOpDebugger() + self._debuggers[name] = debugger + return debugger + + def dump_group(self, debugger, group_name): + group = debugger.get_group(group_name) + logger.info("%s Criteria: %d checks, %d passed" % + (group._name, + group._match + group._mismatch, group._match)) + + if debugger.get_debug_level() > 1: + for kv in group._reasons.items(): + logger.info(" - %s = %d" % kv) + + def dump_counters(self, debugger): + for kv in debugger._counters.items(): + logger.info("Counter \"%s\" = %d" % kv) + + def dump_debuggers(self): + for debugger in self._debuggers.values(): + if debugger.get_debug_level() == 0: + continue + + logger.info("---- Trigger Definition: %s ----" % debugger._name) + for name in debugger._groups.keys(): + self.dump_group(debugger, name) + + self.dump_counters(debugger) + debugger.reset() + logger.info("----------------------------") diff --git a/winchester/definition.py b/winchester/definition.py index c83de23..dc5122d 100644 --- a/winchester/definition.py +++ b/winchester/definition.py @@ -24,7 +24,8 @@ class Criterion(object): def get_from_expression(cls, expression, trait_name): if isinstance(expression, collections.Mapping): if len(expression) != 1: - raise DefinitionError("Only exactly one type of match is allowed per criterion expression") + raise DefinitionError("Only exactly one type of match is " + "allowed per criterion expression") ctype = expression.keys()[0] expr = expression[ctype] if ctype == 'int': @@ -45,16 +46,17 @@ class Criterion(object): self.op = '=' self.value = expr - def match(self, event): + def match(self, event, debug_group): if self.trait_name not in event: - return False + return debug_group.mismatch("not %s" % self.trait_name) value = event[self.trait_name] if self.op == '=': - return value == self.value + return debug_group.check(value == self.value, "== failed") elif self.op == '>': - return value > self.value + return debug_group.check(value > self.value, "> failed") elif self.op == '<': - return value < self.value + return debug_group.check(value < self.value, "< failed") + return debug_group.mismatch("Criterion match() fall-thru") class NumericCriterion(Criterion): @@ -95,16 +97,16 @@ class TimeCriterion(Criterion): self.trait_name = trait_name self.time_expr = timex.parse(expression) - def match(self, event): + def match(self, event, debug_group): if self.trait_name not in event: - return False + return debug_group.mismatch("Time: not '%s'" % self.trait_name) value = event[self.trait_name] try: timerange = self.time_expr(**filter_event_timestamps(event)) except timex.TimexExpressionError: # the event doesn't contain a trait referenced in the expression. - return False - return value in timerange + return debug_group.mismatch("Time: no referenced trait") + return debug_group.check(value in timerange, "Not in timerange") class Criteria(object): @@ -150,30 +152,31 @@ class Criteria(object): return (self.included_type(event_type) and not self.excluded_type(event_type)) - def match(self, event): + def match(self, event, debug_group): if not self.match_type(event['event_type']): - return False + return debug_group.mismatch("Wrong event type") if self.timestamp: try: t = self.timestamp(**filter_event_timestamps(event)) except timex.TimexExpressionError: # the event doesn't contain a trait referenced in the expression. - return False + return debug_group.mismatch("No timestamp trait") if event['timestamp'] not in t: - return False + return debug_group.mismatch("Not time yet.") if not self.traits: - return True - return all(criterion.match(event) for - criterion in self.traits.values()) + return debug_group.match() + return all(criterion.match(event, debug_group) for + criterion in self.traits.values()) class TriggerDefinition(object): - def __init__(self, config): + def __init__(self, config, debug_manager): if 'name' not in config: raise DefinitionError("Required field in trigger definition not " "specified 'name'") self.name = config['name'] + self.debug_level = int(config.get('debug_level', 0)) self.distinguished_by = config.get('distinguished_by', []) for dt in self.distinguished_by: if isinstance(dt, collections.Mapping): @@ -202,19 +205,30 @@ class TriggerDefinition(object): self.load_criteria = [] if 'load_criteria' in config: self.load_criteria = [Criteria(c) for c in config['load_criteria']] + if debug_manager: + self.set_debugger(debug_manager) + + def set_debugger(self, debug_manager): + self.debugger = debug_manager.get_debugger(self) def match(self, event): # all distinguishing traits must exist to match. + group = self.debugger.get_group("Match") for dt in self.distinguished_by: if isinstance(dt, collections.Mapping): trait_name = dt.keys()[0] else: trait_name = dt if trait_name not in event: + group.mismatch("not '%s'" % trait_name) return None + for criteria in self.match_criteria: - if criteria.match(event): + if criteria.match(event, group): + group.match() return criteria + + group.mismatch("No matching criteria") return None def get_distinguishing_traits(self, event, matching_criteria): @@ -237,14 +251,14 @@ class TriggerDefinition(object): return timestamp + datetime.timedelta(seconds=self.fire_delay) def should_fire(self, events): + group = self.debugger.get_group("Fire") for criteria in self.fire_criteria: matches = 0 for event in events: - if criteria.match(event): + if criteria.match(event, group): matches += 1 if matches >= criteria.number: break if matches < criteria.number: - return False - return True - + return group.mismatch("Not enough matching criteria") + return group.match() diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 545c446..5d78b73 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -59,31 +59,37 @@ class Pipeline(object): raise PipelineExecutionError("Error loading pipeline", e) self.handlers.append(handler) - def handle_events(self, events): + def handle_events(self, events, debugger): event_ids = set(e['message_id'] for e in events) try: for handler in self.handlers: events = handler.handle_events(events, self.env) + debugger.bump_counter("Pre-commit successful") except Exception as e: logger.exception("Error processing pipeline %s" % self.name) - self.rollback() + debugger.bump_counter("Pipeline error") + self.rollback(debugger) raise PipelineExecutionError("Error in pipeline", e) new_events = [e for e in events if e['message_id'] not in event_ids] - self.commit() + self.commit(debugger) return new_events - def commit(self): + def commit(self, debugger): for handler in self.handlers: try: handler.commit() + debugger.bump_counter("Commit successful") except: + debugger.bump_counter("Commit error") logger.exception("Commit error on handler in pipeline %s" % self.name) - def rollback(self): + def rollback(self, debugger): for handler in self.handlers: try: handler.rollback() + debugger.bump_counter("Rollback successful") except: + debugger.bump_counter("Rollback error") logger.exception("Rollback error on handler in pipeline %s" % self.name) @@ -94,22 +100,30 @@ class PipelineManager(object): configs = TriggerManager.config_description() configs.update(dict( pipeline_handlers=ConfigItem(required=True, - help="dictionary of pipeline handlers to load " - "Classes specified with simport syntax. " - "simport docs for more info"), - pipeline_worker_batch_size=ConfigItem(help="Number of streams for pipeline " - "worker(s) to load at a time", default=1000), - pipeline_worker_delay=ConfigItem(help="Number of seconds for pipeline worker to sleep " - "when it finds no streams to process", default=10), + help="dictionary of pipeline handlers to load " + "Classes specified with simport syntax. " + "simport docs for more info"), + pipeline_worker_batch_size=ConfigItem( + help="Number of streams for pipeline " + "worker(s) to load at a time", + default=1000), + pipeline_worker_delay=ConfigItem( + help="Number of seconds for pipeline worker " + "to sleep when it finds no streams to " + "process", default=10), pipeline_config=ConfigItem(required=True, help="Name of pipeline config file " - "defining the handlers for each pipeline."), - purge_completed_streams=ConfigItem(help="Delete successfully proccessed " - "streams when finished?", default=True), + "defining the handlers for each " + "pipeline."), + purge_completed_streams=ConfigItem( + help="Delete successfully proccessed " + "streams when finished?", + default=True), )) return configs - def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None): + def __init__(self, config, db=None, pipeline_handlers=None, + pipeline_config=None, trigger_defs=None): logger.debug("PipelineManager: Using config: %s" % str(config)) config = ConfigManager.wrap(config, self.config_description()) self.config = config @@ -142,7 +156,7 @@ class PipelineManager(object): else: defs = config.load_file(config['trigger_definitions']) logger.debug("Loaded trigger definitions %s" % str(defs)) - self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] + self.trigger_definitions = [TriggerDefinition(conf, None) for conf in defs] self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions) self.trigger_manager = TriggerManager(self.config, db=self.db, @@ -187,15 +201,19 @@ class PipelineManager(object): self.streams_loaded = 0 self.last_status = self.current_time() + self.trigger_manager.debug_manager.dump_debuggers() + def add_new_events(self, events): for event in events: self.trigger_manager.add_event(event) - def _run_pipeline(self, stream, trigger_def, pipeline_name, pipeline_config): + def _run_pipeline(self, stream, trigger_def, pipeline_name, + pipeline_config): events = self.db.get_stream_events(stream) + debugger = trigger_def.debugger try: pipeline = Pipeline(pipeline_name, pipeline_config, self.pipeline_handlers) - new_events = pipeline.handle_events(events) + new_events = pipeline.handle_events(events, debugger) except PipelineExecutionError: logger.error("Exception in pipeline %s handling stream %s" % ( pipeline_name, stream.id)) @@ -228,15 +246,22 @@ class PipelineManager(object): logger.error("Stream %s locked while trying to set 'expire_error' state! " "This should not happen." % stream.id) + def safe_get_debugger(self, trigger_def): + return trigger_def.debugger if trigger_def is not None else \ + self.trigger_manager.debug_manager.get_debugger(None) + def fire_stream(self, stream): + trigger_def = self.trigger_map.get(stream.name) + debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.firing) except LockError: logger.debug("Stream %s locked. Moving on..." % stream.id) + debugger.bump_counter("Locked") return False logger.debug("Firing Stream %s." % stream.id) - trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: + debugger.bump_counter("Unknown trigger def '%s'" % stream.name) logger.error("Stream %s has unknown trigger definition %s" % ( stream.id, stream.name)) self._error_stream(stream) @@ -245,28 +270,36 @@ class PipelineManager(object): if pipeline is not None: pipe_config = self.pipeline_config.get(pipeline) if pipe_config is None: - logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( - stream.name, stream.id, pipeline)) + debugger.bump_counter("Unknown pipeline '%s'" % pipeline) + logger.error("Trigger %s for stream %s has unknown " + "pipeline %s" % (stream.name, stream.id, + pipeline)) self._error_stream(stream) - if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + if not self._run_pipeline(stream, trigger_def, pipeline, + pipe_config): self._error_stream(stream) return False else: logger.debug("No fire pipeline for stream %s. Nothing to do." % ( stream.id)) + debugger.bump_counter("No fire pipeline for '%s'" % stream.name) self._complete_stream(stream) + debugger.bump_counter("Streams fired") self.streams_fired +=1 return True def expire_stream(self, stream): + trigger_def = self.trigger_map.get(stream.name) + debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.expiring) except LockError: + debugger.bump_counter("Locked") logger.debug("Stream %s locked. Moving on..." % stream.id) return False logger.debug("Expiring Stream %s." % stream.id) - trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: + debugger.bump_counter("Unknown trigger def '%s'" % stream.name) logger.error("Stream %s has unknown trigger definition %s" % ( stream.id, stream.name)) self._expire_error_stream(stream) @@ -275,16 +308,20 @@ class PipelineManager(object): if pipeline is not None: pipe_config = self.pipeline_config.get(pipeline) if pipe_config is None: + debugger.bump_counter("Unknown pipeline '%s'" % pipeline) logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( stream.name, stream.id, pipeline)) self._expire_error_stream(stream) - if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + if not self._run_pipeline(stream, trigger_def, pipeline, + pipe_config): self._expire_error_stream(stream) return False else: logger.debug("No expire pipeline for stream %s. Nothing to do." % ( stream.id)) + debugger.bump_counter("No expire pipeline for '%s'" % stream.name) self._complete_stream(stream) + debugger.bump_counter("Streams expired") self.streams_expired +=1 return True diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index a9100dc..b48b751 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -3,8 +3,9 @@ import logging from stackdistiller import distiller, condenser import simport -from winchester.db import DBInterface, DuplicateError from winchester.config import ConfigManager, ConfigSection, ConfigItem +from winchester import debugging +from winchester.db import DBInterface, DuplicateError from winchester.definition import TriggerDefinition @@ -66,32 +67,37 @@ class TriggerManager(object): @classmethod def config_description(cls): - return dict(config_path=ConfigItem(help="Path(s) to find additional config files", - multiple=True, default='.'), + return dict(config_path=ConfigItem( + help="Path(s) to find additional config files", + multiple=True, default='.'), distiller_config=ConfigItem(required=True, help="Name of distiller config file " "describing what to extract from the " "notifications"), - distiller_trait_plugins=ConfigItem(help="dictionary of trait plugins to load " - "for stackdistiller. Classes specified with " - "simport syntax. See stackdistiller and " - "simport docs for more info", default=dict()), - catch_all_notifications=ConfigItem(help="Store basic info for all notifications," - " even if not listed in distiller config", - default=False), - statistics_period=ConfigItem(help="Emit stats on event counts, etc every " - "this many seconds", default=10), + distiller_trait_plugins=ConfigItem( + help="dictionary of trait plugins to load " + "for stackdistiller. Classes specified with " + "simport syntax. See stackdistiller and " + "simport docs for more info", default=dict()), + catch_all_notifications=ConfigItem( + help="Store basic info for all notifications," + " even if not listed in distiller config", + default=False), + statistics_period=ConfigItem( + help="Emit stats on event counts, etc every " + "this many seconds", default=10), database=ConfigSection(help="Database connection info.", - config_description=DBInterface.config_description()), + config_description=DBInterface.config_description()), trigger_definitions=ConfigItem(required=True, - help="Name of trigger definitions file " - "defining trigger conditions and what events to " - "process for each stream"), - ) + help="Name of trigger definitions file " + "defining trigger conditions and what events to " + "process for each stream"), + ) def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None): config = ConfigManager.wrap(config, self.config_description()) self.config = config + self.debug_manager = debugging.DebugManager() config.check_config() config.add_config_path(*config['config_path']) @@ -110,10 +116,13 @@ class TriggerManager(object): catchall=config['catch_all_notifications']) if trigger_defs is not None: self.trigger_definitions = trigger_defs + for t in self.trigger_definitions: + t.set_debugger(self.debug_manager) else: defs = config.load_file(config['trigger_definitions']) - self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] - self.statistics_period = config['statistics_period'] + self.trigger_definitions = [TriggerDefinition(conf, + self.debug_manager) + for conf in defs] self.saved_events = 0 self.received = 0 self.last_status = self.current_time() @@ -180,9 +189,12 @@ class TriggerManager(object): self.saved_events = 0 self.last_status = self.current_time() + self.debug_manager.dump_debuggers() + def _add_or_create_stream(self, trigger_def, event, dist_traits): stream = self.db.get_active_stream(trigger_def.name, dist_traits, self.current_time()) if stream is None: + trigger_def.debugger.bump_counter("New stream") stream = self.db.create_stream(trigger_def.name, event, dist_traits, trigger_def.expiration) logger.debug("Created New stream %s for %s: distinguished by %s" % ( @@ -194,6 +206,7 @@ class TriggerManager(object): def _ready_to_fire(self, stream, trigger_def): timestamp = trigger_def.get_fire_timestamp(self.current_time()) self.db.stream_ready_to_fire(stream, timestamp) + trigger_def.debugger.bump_counter("Ready to fire") logger.debug("Stream %s ready to fire at %s" % ( stream.id, timestamp)) @@ -202,16 +215,17 @@ class TriggerManager(object): for trigger_def in self.trigger_definitions: matched_criteria = trigger_def.match(event) if matched_criteria: - dist_traits = trigger_def.get_distinguishing_traits(event, matched_criteria) - stream = self._add_or_create_stream(trigger_def, event, dist_traits) + dist_traits = trigger_def.get_distinguishing_traits( + event, matched_criteria) + stream = self._add_or_create_stream(trigger_def, event, + dist_traits) + trigger_def.debugger.bump_counter("Added events") if stream.fire_timestamp is None: - if trigger_def.should_fire(self.db.get_stream_events(stream)): + if trigger_def.should_fire(self.db.get_stream_events( + stream)): self._ready_to_fire(stream, trigger_def) - if (self.current_time() - self.last_status).seconds > self.statistics_period: - self._log_statistics() def add_notification(self, notification_body): event = self.convert_notification(notification_body) if event: self.add_event(event) - diff --git a/winchester/yagi_handler.py b/winchester/yagi_handler.py index a4a541e..1f748d6 100644 --- a/winchester/yagi_handler.py +++ b/winchester/yagi_handler.py @@ -28,3 +28,5 @@ class WinchesterHandler(BaseHandler): for notification in self.iterate_payloads(messages, env): self.trigger_manager.add_notification(notification) + def on_idle(self, num_messages, queue_name): + self.trigger_manager._log_statistics()