diff --git a/alembic.ini b/alembic.ini index 848a387..4590e00 100644 --- a/alembic.ini +++ b/alembic.ini @@ -11,7 +11,8 @@ script_location = alembic # the 'revision' command, regardless of autogenerate # revision_environment = false -sqlalchemy.url = driver://user:pass@localhost/dbname +#sqlalchemy.url = driver://user:pass@localhost/dbname +sqlalchemy.url = mysql://winchester:testpasswd@localhost/winchester # Logging configuration diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/versions/44289d1492e6_.py b/alembic/versions/44289d1492e6_.py new file mode 100644 index 0000000..3c2306d --- /dev/null +++ b/alembic/versions/44289d1492e6_.py @@ -0,0 +1,93 @@ +"""Stream schema. + +Revision ID: 44289d1492e6 +Revises: 3ab6d7bf80cd +Create Date: 2014-08-07 07:34:14.721111 + +""" + +# revision identifiers, used by Alembic. +revision = '44289d1492e6' +down_revision = '3ab6d7bf80cd' + +from alembic import op +import sqlalchemy as sa +from winchester import models + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.create_table('stream', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('first_event', models.PreciseTimestamp(), nullable=False), + sa.Column('last_event', models.PreciseTimestamp(), nullable=False), + sa.Column('expire_timestamp', models.PreciseTimestamp(), nullable=True), + sa.Column('fire_timestamp', models.PreciseTimestamp(), nullable=True), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('state', sa.Integer(), nullable=False), + sa.Column('state_serial_no', sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('ix_stream_expire_timestamp', 'stream', ['expire_timestamp'], unique=False) + op.create_index('ix_stream_fire_timestamp', 'stream', ['fire_timestamp'], unique=False) + op.create_index('ix_stream_name', 'stream', ['name'], unique=False) + op.create_index('ix_stream_state', 'stream', ['state'], unique=False) + op.create_table('dist_trait', + sa.Column('stream_id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=100), nullable=False), + sa.Column('type', sa.Integer(), nullable=True), + sa.Column('dt_string', sa.String(length=255), nullable=True), + sa.Column('dt_float', sa.Float(), nullable=True), + sa.Column('dt_int', sa.Integer(), nullable=True), + sa.Column('dt_datetime', models.PreciseTimestamp(), nullable=True), + sa.Column('dt_timerange_begin', models.PreciseTimestamp(), nullable=True), + sa.Column('dt_timerange_end', models.PreciseTimestamp(), nullable=True), + sa.ForeignKeyConstraint(['stream_id'], ['stream.id'], ), + sa.PrimaryKeyConstraint('stream_id', 'name') + ) + op.create_index('ix_dist_trait_dt_datetime', 'dist_trait', ['dt_datetime'], unique=False) + op.create_index('ix_dist_trait_dt_float', 'dist_trait', ['dt_float'], unique=False) + op.create_index('ix_dist_trait_dt_int', 'dist_trait', ['dt_int'], unique=False) + op.create_index('ix_dist_trait_dt_string', 'dist_trait', ['dt_string'], unique=False) + op.create_index('ix_dist_trait_dt_timerange_begin', 'dist_trait', ['dt_timerange_begin'], unique=False) + op.create_index('ix_dist_trait_dt_timerange_end', 'dist_trait', ['dt_timerange_end'], unique=False) + op.create_table('streamevent', + sa.Column('stream_id', sa.Integer(), nullable=False), + sa.Column('event_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['event_id'], ['event.id'], ), + sa.ForeignKeyConstraint(['stream_id'], ['stream.id'], ), + sa.PrimaryKeyConstraint('stream_id', 'event_id') + ) + op.create_index('ix_event_generated', 'event', ['generated'], unique=False) + op.create_index('ix_event_message_id', 'event', ['message_id'], unique=False) + op.create_index('ix_event_type_id', 'event', ['event_type_id'], unique=False) + op.create_index('ix_trait_t_datetime', 'trait', ['t_datetime'], unique=False) + op.create_index('ix_trait_t_float', 'trait', ['t_float'], unique=False) + op.create_index('ix_trait_t_int', 'trait', ['t_int'], unique=False) + op.create_index('ix_trait_t_string', 'trait', ['t_string'], unique=False) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_trait_t_string', table_name='trait') + op.drop_index('ix_trait_t_int', table_name='trait') + op.drop_index('ix_trait_t_float', table_name='trait') + op.drop_index('ix_trait_t_datetime', table_name='trait') + op.drop_index('ix_event_type_id', table_name='event') + op.drop_index('ix_event_message_id', table_name='event') + op.drop_index('ix_event_generated', table_name='event') + op.drop_table('streamevent') + op.drop_index('ix_dist_trait_dt_timerange_end', table_name='dist_trait') + op.drop_index('ix_dist_trait_dt_timerange_begin', table_name='dist_trait') + op.drop_index('ix_dist_trait_dt_string', table_name='dist_trait') + op.drop_index('ix_dist_trait_dt_int', table_name='dist_trait') + op.drop_index('ix_dist_trait_dt_float', table_name='dist_trait') + op.drop_index('ix_dist_trait_dt_datetime', table_name='dist_trait') + op.drop_table('dist_trait') + op.drop_index('ix_stream_state', table_name='stream') + op.drop_index('ix_stream_name', table_name='stream') + op.drop_index('ix_stream_fire_timestamp', table_name='stream') + op.drop_index('ix_stream_expire_timestamp', table_name='stream') + op.drop_table('stream') + ### end Alembic commands ### diff --git a/etc/event_definitions.yaml b/etc/event_definitions.yaml new file mode 100644 index 0000000..b4f00d3 --- /dev/null +++ b/etc/event_definitions.yaml @@ -0,0 +1,63 @@ +--- +- event_type: compute.instance.* + traits: &instance_traits + tenant_id: + fields: payload.tenant_id + user_id: + fields: payload.user_id + instance_id: + fields: payload.instance_id + host: + fields: publisher_id + plugin: + name: split + parameters: + segment: 1 + max_split: 1 + service: + fields: publisher_id + plugin: split + memory_mb: + type: int + fields: payload.memory_mb + disk_gb: + type: int + fields: payload.disk_gb + root_gb: + type: int + fields: payload.root_gb + ephemeral_gb: + type: int + fields: payload.ephemeral_gb + vcpus: + type: int + fields: payload.vcpus + instance_type_id: + type: int + fields: payload.instance_type_id + instance_type: + fields: payload.instance_type + state: + fields: payload.state + os_architecture: + fields: payload.image_meta.'org.openstack__1__architecture' + os_version: + fields: payload.image_meta.'org.openstack__1__os_version' + os_distro: + fields: payload.image_meta.'org.openstack__1__os_distro' + launched_at: + type: datetime + fields: payload.launched_at + deleted_at: + type: datetime + fields: payload.deleted_at +- event_type: compute.instance.exists + traits: + <<: *instance_traits + audit_period_beginning: + type: datetime + fields: payload.audit_period_beginning + audit_period_ending: + type: datetime + fields: payload.audit_period_ending + diff --git a/etc/logging.conf b/etc/logging.conf new file mode 100644 index 0000000..f8d2ad4 --- /dev/null +++ b/etc/logging.conf @@ -0,0 +1,63 @@ +[loggers] +keys = root, winchester + +[handlers] +keys = stderr, stdout, watchedfile, syslog, null + +[formatters] +keys = winchester, default + +[logger_root] +level = WARNING +handlers = null + +[logger_winchester] +level = DEBUG +handlers = stderr +qualname = winchester + +[logger_amqplib] +level = WARNING +handlers = stderr +qualname = amqplib + +[logger_sqlalchemy] +level = WARNING +handlers = stderr +qualname = sqlalchemy +# "level = INFO" logs SQL queries. +# "level = DEBUG" logs SQL queries and results. +# "level = WARNING" logs neither. (Recommended for production systems.) + +[handler_stderr] +class = StreamHandler +args = (sys.stderr,) +formatter = winchester + +[handler_stdout] +class = StreamHandler +args = (sys.stdout,) +formatter = winchester + +[handler_watchedfile] +class = handlers.WatchedFileHandler +args = ('winchester.log',) +formatter = winchester + +[handler_syslog] +class = handlers.SysLogHandler +args = ('/dev/log', handlers.SysLogHandler.LOG_USER) +formatter = winchester + +[handler_null] +class = NullHandler +formatter = default +args = () + +[formatter_winchester] +# substitutions available for formats are documented at: +# https://docs.python.org/2/library/logging.html#logrecord-attributes +format = [%(levelname)s at %(asctime)s line: %(lineno)d] %(message)s + +[formatter_default] +format = %(message)s diff --git a/etc/pipelines.yaml b/etc/pipelines.yaml new file mode 100644 index 0000000..85ffd4a --- /dev/null +++ b/etc/pipelines.yaml @@ -0,0 +1,5 @@ +--- +test_pipeline: + - logger +test_expire_pipeline: + - logger diff --git a/etc/triggers.yaml b/etc/triggers.yaml new file mode 100644 index 0000000..6c82d5a --- /dev/null +++ b/etc/triggers.yaml @@ -0,0 +1,22 @@ +--- +- name: test_trigger + distinguished_by: + - instance_id + - timestamp: "day" + expiration: "$last + 1h" + fire_pipeline: "test_pipeline" + expire_pipeline: "test_expire_pipeline" + match_criteria: + - event_type: + - compute.instance.* + - "!compute.instance.exists" +#### Traits are optional. +# traits: +# os_distro: ubuntu +# memory_mb: +# numeric: "> 4096" + - event_type: compute.instance.exists + map_distingushed_trait: + timestamp: audit_period_beginning + fire_criteria: + - event_type: compute.instance.exists diff --git a/etc/winchester.yaml b/etc/winchester.yaml new file mode 100644 index 0000000..338daee --- /dev/null +++ b/etc/winchester.yaml @@ -0,0 +1,34 @@ +--- +###### This adds directories to the search path for other configfiles. +config_path: /home/winchester/etc +## It can also be a list: +#config_path: +# - /etc/winchester +# - /home/wherever + +###### logging +log_level: debug +## You can also use a full logging config file. +#logging_config: logging.conf + +###### How often to log stats +statistics_period: 10 + +pipeline_worker_batch_size: 1000 +pipeline_worker_delay: 10 + +####### You can specify extra stackdistiller trait plugins here: +#distiller_trait_plugins: +# test: some.module.path:TestTraitPlugin + +catch_all_notifications: false + +database: + url: mysql://winchester:testpasswd@localhost/winchester + +distiller_config: event_definitions.yaml +trigger_definitions: triggers.yaml +pipeline_config: pipelines.yaml + +pipeline_handlers: + logger: winchester.pipeline_handler:LoggingHandler diff --git a/requirements.txt b/requirements.txt index 2bbf435..c0e0ae7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,8 @@ +simport +stackdistiller +timex +python-daemon +MySQL-python alembic>=0.4.1 enum34>=1.0 SQLAlchemy>=0.9.6 diff --git a/setup.cfg b/setup.cfg index 8c28267..973f459 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,10 @@ [metadata] description-file = README.md +[files] +packages = + winchester + +[entry_points] +console_scripts = + pipeline_worker = winchester.worker:main diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..9b1e72a --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,401 @@ +#for Python2.6 compatability. +import unittest2 as unittest + +import mock +import logging + +import datetime +import timex +from winchester import db, models + +logging.basicConfig() + +TEST_DATA = [ + {'event_type': [ + dict(id=1, desc='test.thing.begin'), + dict(id=2, desc='test.thing.end'), + dict(id=3, desc='test.otherthing.foo'), + ]}, + {'event': [ + dict(id=1, + message_id='1234-5678-001', + generated=datetime.datetime(2014,8,1,10,20,45,453201), + event_type_id=1,), + dict(id=2, + message_id='1234-5678-002', + generated=datetime.datetime(2014,8,1,15,25,45,453201), + event_type_id=2,), + dict(id=3, + message_id='1234-5678-003', + generated=datetime.datetime(2014,8,1,2,10,12,0), + event_type_id=3,), + dict(id=4, + message_id='1234-5678-004', + generated=datetime.datetime(2014,8,1,4,57,55,42), + event_type_id=3,), + ]}, + {'trait': [ + dict(event_id=1, name='instance_id', type=int(models.Datatype.string), + t_string='aaaa-bbbb-cccc-dddd'), + dict(event_id=1, name='memory_mb', type=int(models.Datatype.int), + t_int=1024), + dict(event_id=1, name='test_weight', type=int(models.Datatype.float), + t_float=20112.42), + dict(event_id=1, name='launched_at', type=int(models.Datatype.datetime), + t_datetime=datetime.datetime(2014,7,1,2,30,45,453201)), + ]}, + {'stream': [ + dict(id=1, first_event=datetime.datetime(2014,8,1,2,10,12,0), + last_event=datetime.datetime(2014,8,1,4,57,55,42), + name='test_trigger', + expire_timestamp=datetime.datetime(2014,8,2,4,57,55,42), + state=int(models.StreamState.active), + state_serial_no=0), + dict(id=2, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='another_test_trigger', + expire_timestamp=datetime.datetime(2014,8,2,4,57,55,42), + state=int(models.StreamState.active), + state_serial_no=0), + dict(id=3, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='fire_test_trigger', + fire_timestamp=datetime.datetime(2014,8,10,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,15,6,0,0,42), + state=int(models.StreamState.active), + state_serial_no=0), + dict(id=4, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='fire_test_trigger', + fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), + state=int(models.StreamState.active), + state_serial_no=0), + ]}, + {'streamevent': [ + dict(stream_id=1, event_id=3), + dict(stream_id=1, event_id=4), + dict(stream_id=2, event_id=2), + dict(stream_id=3, event_id=2), + dict(stream_id=3, event_id=1), + dict(stream_id=4, event_id=2), + ]}, + {'dist_trait': [ + dict(stream_id=1, name='instance_id', type=int(models.Datatype.string), + dt_string='zzzz-xxxx-yyyy-wwww'), + dict(stream_id=1, name='memory_mb', type=int(models.Datatype.int), + dt_int=4096), + dict(stream_id=1, name='test_weight', type=int(models.Datatype.float), + dt_float=3.1415), + dict(stream_id=1, name='launched_at', type=int(models.Datatype.datetime), + dt_datetime=datetime.datetime(2014,7,8,9,40,50,77777)), + dict(stream_id=1, name='timestamp', type=int(models.Datatype.timerange), + dt_timerange_begin=datetime.datetime(2014,7,8,0,0,0,27), + dt_timerange_end=datetime.datetime(2014,7,9,0,0,0,27)), + ]}, +] + + +def create_tables(dbi): + #used for tests + models.Base.metadata.create_all(dbi.engine) + + +def load_fixture_data(dbi, data): + #Used for tests. This is fugly, refactor later (mdragon) + for table in data: + for table_name, rows in table.items(): + for row in rows: + cols = [] + vals = [] + for col, val in row.items(): + cols.append(col) + vals.append(val) + + q = ("INSERT into %(table)s (%(colnames)s) VALUES (%(qs)s)" % + dict(table=table_name, + colnames=','.join(cols), + qs=','.join(('?',) * len(vals)),)) + dbi.engine.execute(q, vals) + + +class TestDB(unittest.TestCase): + + def setUp(self): + super(TestDB, self).setUp() + self.db = db.DBInterface(dict(url='sqlite://')) + create_tables(self.db) + load_fixture_data(self.db, TEST_DATA) + logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) + + def tearDown(self): + logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) + self.db.close() + + def test_get_event_type(self): + t = self.db.get_event_type('test.thing.begin') + self.assertEqual(t.id, 1) + t = self.db.get_event_type('test.not_in_db') + self.assertEqual(t.id, 4) #next unused id. + + def test_create_event(self): + message_id = '9876-0001-0001' + event_type = 'test.thing.begin' + timestamp = datetime.datetime(2014,7,4,12,7,21,4096) + traits = dict(test_string='foobar', + test_number=42, + test_float=3.1415, + test_date=datetime.datetime(2014,7,1,0,0,0,0), + somevalue=u'A fine test string') + self.db.create_event(message_id, event_type, timestamp, traits) + event = self.db.get_event_by_message_id(message_id) + self.assertEqual(len(event), 8) + self.assertEqual(event['message_id'], message_id) + self.assertEqual(event['event_type'], event_type) + self.assertEqual(event['timestamp'], timestamp) + for name, value in traits.items(): + self.assertEqual(event[name], value) + if type(value) == str: + t_value = unicode + else: + t_value = type(value) + self.assertEqual(type(event[name]), t_value) + + def test_create_event_duplicate(self): + message_id = '9876-0001-0001' + event_type = 'test.thing.begin' + timestamp = datetime.datetime(2014,7,4,12,7,21,4096) + traits = dict(test_string='foobar', + test_number=42, + test_float=3.1415, + test_date=datetime.datetime(2014,7,1,0,0,0,0), + somevalue=u'A fine test string') + self.db.create_event(message_id, event_type, timestamp, traits) + with self.assertRaises(db.DuplicateError): + self.db.create_event(message_id, event_type, timestamp, traits) + + def test_get_event_by_message_id(self): + event = self.db.get_event_by_message_id('1234-5678-001') + self.assertEqual(len(event), 7) + expected = dict(message_id='1234-5678-001', + event_type='test.thing.begin', + timestamp=datetime.datetime(2014,8,1,10,20,45,453201), + instance_id='aaaa-bbbb-cccc-dddd', + memory_mb=1024, + test_weight=20112.42, + launched_at=datetime.datetime(2014,7,1,2,30,45,453201),) + self.assertDictContainsSubset(expected, event) + + def test_get_stream_events(self): + stream = self.db.get_stream_by_id(1) + events = self.db.get_stream_events(stream) + self.assertEqual(len(events), 2) + self.assertIn('1234-5678-003', [e['message_id'] for e in events]) + self.assertIn('1234-5678-004', [e['message_id'] for e in events]) + + def test_create_stream(self): + event = dict(message_id='1234-5678-001', + event_type='test.thing.begin', + timestamp=datetime.datetime(2014,8,1,10,20,45,453201), + instance_id='aaaa-bbbb-cccc-dddd', + memory_mb=1024, + test_weight=20112.42, + launched_at=datetime.datetime(2014,7,1,2,30,45,453201),) + timestamp = timex.TimeRange(datetime.datetime(2014,8,1,0,0,0,27), + datetime.datetime(2014,2,2,0,0,0,27)) + dist_traits = dict(timestamp=timestamp, + instance_id='aaaa-bbbb-cccc-dddd') + + class MockTimestamp(object): + pass + + mock_expire_value = datetime.datetime(2014,8,2,12,12,12,12) + + def mock_time_expr(first, last): + self.assertEqual(first, datetime.datetime(2014,8,1,10,20,45,453201)) + self.assertEqual(last, datetime.datetime(2014,8,1,10,20,45,453201)) + t = MockTimestamp() + t.timestamp = mock_expire_value + return t + + stream = self.db.create_stream('test_create_stream', event, dist_traits, mock_time_expr) + self.assertEqual(stream.name, 'test_create_stream') + self.assertEqual(stream.first_event, datetime.datetime(2014,8,1,10,20,45,453201)) + self.assertEqual(stream.last_event, datetime.datetime(2014,8,1,10,20,45,453201)) + self.assertEqual(stream.expire_timestamp, mock_expire_value) + self.assertIsNone(stream.fire_timestamp) + self.assertEqual(stream.state, models.StreamState.active) + self.assertEqual(stream.state_serial_no, 0) + self.assertTrue(self.db.stream_has_dist_trait(stream.id, 'timestamp', timestamp)) + self.assertTrue(self.db.stream_has_dist_trait(stream.id, + 'instance_id', + 'aaaa-bbbb-cccc-dddd')) + events = self.db.get_stream_events(stream) + self.assertEqual(len(events), 1) + self.assertEqual(events[0]['message_id'], '1234-5678-001') + + def test_add_event_stream(self): + stream = self.db.get_stream_by_id(1) + event = dict(message_id='1234-5678-001', + event_type='test.thing.begin', + timestamp=datetime.datetime(2014,8,1,10,20,45,453201), + instance_id='aaaa-bbbb-cccc-dddd', + memory_mb=1024, + test_weight=20112.42, + launched_at=datetime.datetime(2014,7,1,2,30,45,453201),) + + class MockTimestamp(object): + pass + + mock_expire_value = datetime.datetime(2014,8,2,12,12,12,12) + + def mock_time_expr(first, last): + self.assertEqual(first, datetime.datetime(2014,8,1,2,10,12,0)) + self.assertEqual(last, datetime.datetime(2014,8,1,10,20,45,453201)) + t = MockTimestamp() + t.timestamp = mock_expire_value + return t + + self.db.add_event_stream(stream, event, mock_time_expr) + self.assertEqual(stream.expire_timestamp, mock_expire_value) + self.assertEqual(stream.first_event, datetime.datetime(2014,8,1,2,10,12,0)) + self.assertEqual(stream.last_event, datetime.datetime(2014,8,1,10,20,45,453201)) + events = self.db.get_stream_events(stream) + self.assertEqual(len(events), 3) + self.assertIn('1234-5678-001', [e['message_id'] for e in events]) + self.assertIn('1234-5678-003', [e['message_id'] for e in events]) + self.assertIn('1234-5678-004', [e['message_id'] for e in events]) + + def test_stream_dist_traits(self): + with self.db.in_session() as session: + stream = self.db.get_stream_by_id(1, session=session) + dist_traits = stream.distinguished_by_dict + self.assertEqual(len(dist_traits), 5) + self.assertIn('instance_id', dist_traits) + self.assertEqual(dist_traits['instance_id'], 'zzzz-xxxx-yyyy-wwww') + self.assertEqual(type(dist_traits['instance_id']), unicode) + self.assertIn('memory_mb', dist_traits) + self.assertEqual(dist_traits['memory_mb'], 4096) + self.assertEqual(type(dist_traits['memory_mb']), int) + self.assertIn('test_weight', dist_traits) + self.assertEqual(dist_traits['test_weight'], 3.1415) + self.assertEqual(type(dist_traits['test_weight']), float) + self.assertIn('launched_at', dist_traits) + self.assertEqual(dist_traits['launched_at'], datetime.datetime(2014,7,8,9,40,50,77777)) + self.assertEqual(type(dist_traits['launched_at']), datetime.datetime) + self.assertIn('timestamp', dist_traits) + timestamp = dist_traits['timestamp'] + self.assertEqual(type(timestamp), timex.TimeRange) + self.assertEqual(timestamp.begin, datetime.datetime(2014,7,8,0,0,0,27)) + self.assertEqual(timestamp.end, datetime.datetime(2014,7,9,0,0,0,27)) + + def test_stream_has_dist_trait(self): + #this mostly tests that the polymorphic trait comparisons are working. + dt = self.db.stream_has_dist_trait(1, 'instance_id', 'zzzz-xxxx-yyyy-wwww') + self.assertIsNotNone(dt) + self.assertEqual(len(dt), 1) + self.assertIn('instance_id', dt) + self.assertEqual(dt['instance_id'], 'zzzz-xxxx-yyyy-wwww') + + dt = self.db.stream_has_dist_trait(1, 'memory_mb', 4096) + self.assertIsNotNone(dt) + self.assertEqual(len(dt), 1) + self.assertIn('memory_mb', dt) + self.assertEqual(dt['memory_mb'], 4096) + + dt = self.db.stream_has_dist_trait(1, 'test_weight', 3.1415) + self.assertIsNotNone(dt) + self.assertEqual(len(dt), 1) + self.assertIn('test_weight', dt) + self.assertEqual(dt['test_weight'], 3.1415) + + launched = datetime.datetime(2014,7,8,9,40,50,77777) + dt = self.db.stream_has_dist_trait(1, 'launched_at', launched) + self.assertIsNotNone(dt) + self.assertEqual(len(dt), 1) + self.assertIn('launched_at', dt) + self.assertEqual(dt['launched_at'], launched) + + timestamp = timex.TimeRange(datetime.datetime(2014,7,8,0,0,0,27), + datetime.datetime(2014,7,9,0,0,0,27)) + dt = self.db.stream_has_dist_trait(1, 'timestamp', timestamp) + self.assertIsNotNone(dt) + self.assertEqual(len(dt), 1) + self.assertIn('timestamp', dt) + self.assertEqual(dt['timestamp'].begin, timestamp.begin) + self.assertEqual(dt['timestamp'].end, timestamp.end) + + def test_get_active_stream(self): + timestamp = timex.TimeRange(datetime.datetime(2014,7,8,0,0,0,27), + datetime.datetime(2014,7,9,0,0,0,27)) + dist_traits = dict(instance_id='zzzz-xxxx-yyyy-wwww', + memory_mb=4096, + test_weight=3.1415, + launched_at=datetime.datetime(2014,7,8,9,40,50,77777), + timestamp=timestamp) + current_time = datetime.datetime(2014,8,2,1,0,0,02) + stream = self.db.get_active_stream('test_trigger', dist_traits, current_time) + self.assertIsNotNone(stream) + self.assertEqual(stream.id, 1) + current_time = datetime.datetime(2014,8,3,1,0,0,02) + stream = self.db.get_active_stream('test_trigger', dist_traits, current_time) + self.assertIsNone(stream) + + def test_stream_ready_to_fire(self): + stream = self.db.get_stream_by_id(1) + fire_time = datetime.datetime(2014, 8, 2, 12, 21, 2, 2) + self.db.stream_ready_to_fire(stream, fire_time) + stream = self.db.get_stream_by_id(1) + self.assertEqual(stream.fire_timestamp, fire_time) + + def test_get_ready_streams_fire(self): + current_time = datetime.datetime(2014,8,12,0,0,0,42) + streams = self.db.get_ready_streams(10, current_time) + self.assertEqual(len(streams), 2) + stream_ids = [stream.id for stream in streams] + self.assertIn(3, stream_ids) + self.assertIn(4, stream_ids) + + current_time = datetime.datetime(2014,8,10,12,0,0,42) + streams = self.db.get_ready_streams(10, current_time) + self.assertEqual(len(streams), 1) + stream_ids = [stream.id for stream in streams] + self.assertIn(3, stream_ids) + + current_time = datetime.datetime(2014,8,12,0,0,0,42) + streams = self.db.get_ready_streams(1, current_time) + self.assertEqual(len(streams), 1) + + def test_get_ready_streams_expire(self): + current_time = datetime.datetime(2014,8,17,0,0,0,42) + streams = self.db.get_ready_streams(10, current_time, expire=True) + self.assertEqual(len(streams), 4) + stream_ids = [stream.id for stream in streams] + self.assertIn(1, stream_ids) + self.assertIn(2, stream_ids) + self.assertIn(3, stream_ids) + self.assertIn(4, stream_ids) + + current_time = datetime.datetime(2014,8,10,12,0,0,42) + streams = self.db.get_ready_streams(10, current_time, expire=True) + self.assertEqual(len(streams), 2) + stream_ids = [stream.id for stream in streams] + self.assertIn(1, stream_ids) + self.assertIn(2, stream_ids) + + current_time = datetime.datetime(2014,8,17,0,0,0,42) + streams = self.db.get_ready_streams(1, current_time, expire=True) + self.assertEqual(len(streams), 1) + + def test_set_stream_state_sucess(self): + stream = self.db.get_stream_by_id(1) + old_serial = stream.state_serial_no + new_stream = self.db.set_stream_state(stream, models.StreamState.firing) + self.assertEqual(new_stream.state, models.StreamState.firing) + self.assertEqual(new_stream.state_serial_no, old_serial + 1) + + def test_set_stream_state_locked(self): + stream = self.db.get_stream_by_id(1) + self.db.set_stream_state(stream, models.StreamState.firing) + with self.assertRaises(db.LockError): + self.db.set_stream_state(stream, models.StreamState.firing) diff --git a/tests/test_definition.py b/tests/test_definition.py new file mode 100644 index 0000000..d6b5e55 --- /dev/null +++ b/tests/test_definition.py @@ -0,0 +1,380 @@ +#for Python2.6 compatability. +import unittest2 as unittest + +import mock + +import datetime +import timex + +from winchester import definition + + +class TestCriterion(unittest.TestCase): + + def setUp(self): + super(TestCriterion, self).setUp() + + def test_basic_criterion(self): + c = definition.Criterion(3, 'foo') + self.assertTrue(c.match({'foo': 3})) + self.assertFalse(c.match({'foo': 5})) + self.assertFalse(c.match({'bar': 5})) + self.assertFalse(c.match({'foo': "booga"})) + + def test_numeric_criterion(self): + c = definition.NumericCriterion("3", 'foo') + self.assertTrue(c.match({'foo': 3})) + self.assertFalse(c.match({'foo': 5})) + self.assertFalse(c.match({'bar': 5})) + self.assertFalse(c.match({'foo': "booga"})) + c = definition.NumericCriterion("> 3", 'foo') + self.assertFalse(c.match({'foo': 3})) + self.assertTrue(c.match({'foo': 5})) + c = definition.NumericCriterion("< 3", 'foo') + self.assertFalse(c.match({'foo': 3})) + self.assertFalse(c.match({'foo': 5})) + self.assertTrue(c.match({'foo': 1})) + with self.assertRaises(definition.DefinitionError): + c = definition.NumericCriterion("zazz", "foo") + with self.assertRaises(definition.DefinitionError): + c = definition.NumericCriterion("", "foo") + + def test_float_criterion(self): + c = definition.FloatCriterion("3.14", 'foo') + self.assertTrue(c.match({'foo': 3.14})) + self.assertFalse(c.match({'foo': 5.2})) + self.assertFalse(c.match({'bar': 5.2})) + self.assertFalse(c.match({'foo': "booga"})) + c = definition.FloatCriterion("> 3.14", 'foo') + self.assertFalse(c.match({'foo': 3.14})) + self.assertTrue(c.match({'foo': 5.2})) + c = definition.FloatCriterion("< 3.14", 'foo') + self.assertFalse(c.match({'foo': 3.14})) + self.assertFalse(c.match({'foo': 3.5})) + self.assertTrue(c.match({'foo': 3.02})) + with self.assertRaises(definition.DefinitionError): + c = definition.FloatCriterion("zazz", "foo") + with self.assertRaises(definition.DefinitionError): + c = definition.FloatCriterion("", "foo") + + def test_time_criterion(self): + c = definition.TimeCriterion("day", "foo") + e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), + foo=datetime.datetime(2014,8,1,1,2,0,0)) + self.assertTrue(c.match(e)) + e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), + foo=datetime.datetime(2014,8,2,1,2,0,0)) + self.assertFalse(c.match(e)) + e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), + bar=datetime.datetime(2014,8,1,1,2,0,0)) + self.assertFalse(c.match(e)) + e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), + message_id='1234-5678', + quux=4, + foo=datetime.datetime(2014,8,1,1,2,0,0)) + self.assertTrue(c.match(e)) + + + +class TestCriteria(unittest.TestCase): + + def setUp(self): + super(TestCriteria, self).setUp() + + def test_defaults(self): + criteria = definition.Criteria({}) + self.assertEqual(len(criteria.included_types), 1) + self.assertEqual(len(criteria.excluded_types), 0) + self.assertEqual(criteria.included_types[0], '*') + self.assertEqual(criteria.number, 1) + self.assertIsNone(criteria.timestamp) + self.assertEqual(len(criteria.map_distinguished_by), 0) + self.assertEqual(len(criteria.traits), 0) + + def test_event_type_configs(self): + config = dict(event_type="test.foo.bar") + criteria = definition.Criteria(config) + self.assertEqual(len(criteria.included_types), 1) + self.assertEqual(len(criteria.excluded_types), 0) + self.assertEqual(criteria.included_types[0], 'test.foo.bar') + config = dict(event_type="!test.foo.bar") + criteria = definition.Criteria(config) + self.assertEqual(len(criteria.included_types), 1) + self.assertEqual(len(criteria.excluded_types), 1) + self.assertEqual(criteria.included_types[0], '*') + self.assertEqual(criteria.excluded_types[0], 'test.foo.bar') + config = dict(event_type=["test.foo.bar", "!test.wakka.wakka"]) + criteria = definition.Criteria(config) + self.assertEqual(len(criteria.included_types), 1) + self.assertEqual(len(criteria.excluded_types), 1) + self.assertEqual(criteria.included_types[0], 'test.foo.bar') + self.assertEqual(criteria.excluded_types[0], 'test.wakka.wakka') + + def test_match_type(self): + config = dict(event_type=["test.foo.bar", "!test.wakka.wakka"]) + criteria = definition.Criteria(config) + self.assertTrue(criteria.match_type('test.foo.bar')) + self.assertFalse(criteria.match_type('test.wakka.wakka')) + self.assertFalse(criteria.match_type('test.foo.baz')) + config = dict(event_type=["test.foo.*", "!test.wakka.*"]) + criteria = definition.Criteria(config) + self.assertTrue(criteria.match_type('test.foo.bar')) + self.assertTrue(criteria.match_type('test.foo.baz')) + self.assertFalse(criteria.match_type('test.wakka.wakka')) + + def test_match_for_type(self): + config = dict(event_type=["test.foo.*", "!test.wakka.*"]) + criteria = definition.Criteria(config) + event1 = dict(event_type = "test.foo.zazz") + event2 = dict(event_type = "test.wakka.zazz") + event3 = dict(event_type = "test.boingy") + self.assertTrue(criteria.match(event1)) + self.assertFalse(criteria.match(event2)) + self.assertFalse(criteria.match(event3)) + + def test_match_for_timestamp(self): + config = dict(timestamp='day($launched_at)') + criteria = definition.Criteria(config) + event1 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4)) + event2 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,2,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4)) + event3 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,2,17,16,15,14)) + self.assertTrue(criteria.match(event1)) + self.assertFalse(criteria.match(event2)) + self.assertFalse(criteria.match(event3)) + + def test_match_for_traits(self): + config = dict(traits=dict(some_trait="test", + launched_at={'datetime': "day"}, + memory_mb={'int': "> 2048"}, + test_weight={'float': "< 4.02"}, + other_trait={'string': 'text here'})) + criteria = definition.Criteria(config) + event1 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4), + some_trait='test', + other_trait='text here', + memory_mb=4096, + test_weight=3.1415) + event2 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4), + some_trait='foo', + other_trait='text here', + memory_mb=4096, + test_weight=3.1415) + event3 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4), + other_trait='text here', + memory_mb=4096, + test_weight=3.1415) + event4 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,2,1,2,3,4), + some_trait='test', + other_trait='text here', + memory_mb=4096, + test_weight=3.1415) + event5 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4), + some_trait='test', + other_trait='text here', + memory_mb=1024, + test_weight=3.1415) + event6 = dict(event_type='test.thing', + timestamp=datetime.datetime(2014,8,1,17,16,15,14), + launched_at=datetime.datetime(2014,8,1,1,2,3,4), + some_trait='test', + other_trait='text here', + memory_mb=4096, + test_weight=6.283) + self.assertTrue(criteria.match(event1)) + self.assertFalse(criteria.match(event2)) + self.assertFalse(criteria.match(event3)) + self.assertFalse(criteria.match(event4)) + self.assertFalse(criteria.match(event5)) + self.assertFalse(criteria.match(event6)) + +class TestTriggerDefinition(unittest.TestCase): + + def setUp(self): + super(TestTriggerDefinition, self).setUp() + + def test_config_error_check_and_defaults(self): + with self.assertRaises(definition.DefinitionError): + definition.TriggerDefinition(dict()) + with self.assertRaises(definition.DefinitionError): + definition.TriggerDefinition(dict(name='test_trigger')) + with self.assertRaises(definition.DefinitionError): + definition.TriggerDefinition(dict(name='test_trigger', + expiration='$last + 1d')) + with self.assertRaises(definition.DefinitionError): + definition.TriggerDefinition(dict(name='test_trigger', + expiration='$last + 1d', + fire_pipeline='test_pipeline')) + with self.assertRaises(definition.DefinitionError): + definition.TriggerDefinition( + dict(name='test_trigger', + expiration='$last + 1d', + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')])) + tdef = definition.TriggerDefinition( + dict(name='test_trigger', + expiration='$last + 1d', + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')])) + self.assertEqual(len(tdef.distinguished_by), 0) + self.assertEqual(len(tdef.fire_criteria), 1) + self.assertIsInstance(tdef.fire_criteria[0], definition.Criteria) + self.assertEqual(len(tdef.match_criteria), 1) + self.assertIsInstance(tdef.match_criteria[0], definition.Criteria) + self.assertEqual(tdef.fire_delay, 0) + self.assertEqual(len(tdef.load_criteria), 0) + + def test_match_for_criteria(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + event1 = dict(event_type='test.thing') + event2 = dict(event_type='other.thing') + self.assertTrue(tdef.match(event1)) + self.assertFalse(tdef.match(event2)) + config = dict(name='test_trigger', + expiration='$last + 1d', + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*'), + dict(event_type='other.*')]) + tdef = definition.TriggerDefinition(config) + self.assertTrue(tdef.match(event1)) + self.assertTrue(tdef.match(event2)) + + def test_match_for_distinguished_traits(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + event1 = dict(event_type='test.thing', instance_id='foo') + event2 = dict(event_type='test.thing') + self.assertTrue(tdef.match(event1)) + self.assertFalse(tdef.match(event2)) + + def test_get_distinguished_traits(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + event1 = dict(event_type='test.thing', instance_id='foo') + tdef = definition.TriggerDefinition(config) + mcriteria = tdef.match(event1) + dt = tdef.get_distinguishing_traits(event1, mcriteria) + self.assertEqual(len(dt), 1) + self.assertIn('instance_id', dt) + self.assertEqual(dt['instance_id'], 'foo') + + def test_get_distinguished_traits_with_timeexpression(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id', dict(timestamp='day')], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + event1 = dict(event_type='test.thing', instance_id='foo', + timestamp=datetime.datetime(2014,8,1,20,4,23,444)) + tdef = definition.TriggerDefinition(config) + mcriteria = tdef.match(event1) + dt = tdef.get_distinguishing_traits(event1, mcriteria) + self.assertEqual(len(dt), 2) + self.assertIn('instance_id', dt) + self.assertEqual(dt['instance_id'], 'foo') + timerange = timex.TimeRange(datetime.datetime(2014,8,1,0,0,0,0), + datetime.datetime(2014,8,2,0,0,0,0)) + self.assertIn('timestamp', dt) + self.assertIsInstance(dt['timestamp'], timex.TimeRange) + self.assertEqual(dt['timestamp'].begin, timerange.begin) + self.assertEqual(dt['timestamp'].end, timerange.end) + + def test_get_distinguished_traits_with_map(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*', + map_distinguished_by=dict(instance_id='other_id'))]) + event1 = dict(event_type='test.thing', instance_id='foo', + other_id='bar') + tdef = definition.TriggerDefinition(config) + mcriteria = tdef.match(event1) + dt = tdef.get_distinguishing_traits(event1, mcriteria) + self.assertEqual(len(dt), 1) + self.assertIn('instance_id', dt) + self.assertEqual(dt['instance_id'], 'bar') + + def test_get_fire_timestamp(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + test_time = datetime.datetime(2014,8,1,20,4,23,444) + test_time_plus_1hr = datetime.datetime(2014,8,1,21,4,23,444) + ft = tdef.get_fire_timestamp(test_time) + self.assertEqual(ft, test_time) + config = dict(name='test_trigger', + expiration='$last + 1d', + fire_delay=3600, + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + ft = tdef.get_fire_timestamp(test_time) + self.assertEqual(ft, test_time_plus_1hr) + + def test_should_fire(self): + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + events1 = [ dict(event_type='test.foobar'), + dict(event_type='test.thing'), + dict(event_type='test.thing')] + events2 = [ dict(event_type='test.foobar'), + dict(event_type='test.thing')] + events3 = [ dict(event_type='test.foobar'), + dict(event_type='test.whatsit')] + self.assertTrue(tdef.should_fire(events1)) + self.assertTrue(tdef.should_fire(events2)) + self.assertFalse(tdef.should_fire(events3)) + config = dict(name='test_trigger', + expiration='$last + 1d', + distinguished_by=['instance_id'], + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing', number=2)], + match_criteria=[dict(event_type='test.*')]) + tdef = definition.TriggerDefinition(config) + self.assertTrue(tdef.should_fire(events1)) + self.assertFalse(tdef.should_fire(events2)) + self.assertFalse(tdef.should_fire(events3)) diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py new file mode 100644 index 0000000..d9f2d37 --- /dev/null +++ b/tests/test_pipeline_manager.py @@ -0,0 +1,492 @@ +import unittest2 as unittest + +import mock + +import datetime +import timex + +from winchester import pipeline_manager +from winchester import db as winch_db +from winchester.models import StreamState + + +class TestPipeline(unittest.TestCase): + + def test_check_handler_config(self): + + handler_map = {'test_thing': "blah"} + c = pipeline_manager.Pipeline.check_handler_config("test_thing", handler_map) + self.assertIsInstance(c, dict) + self.assertIn('name', c) + self.assertIn('params', c) + self.assertIsInstance(c['params'], dict) + self.assertEqual(c['name'], 'test_thing') + self.assertEqual(c['params'], {}) + + conf = dict(name='test_thing') + c = pipeline_manager.Pipeline.check_handler_config(conf, handler_map) + self.assertIsInstance(c, dict) + self.assertIn('name', c) + self.assertIn('params', c) + self.assertIsInstance(c['params'], dict) + self.assertEqual(c['name'], 'test_thing') + self.assertEqual(c['params'], {}) + + conf = dict(name='test_thing', params={'book': 42}) + c = pipeline_manager.Pipeline.check_handler_config(conf, handler_map) + self.assertIsInstance(c, dict) + self.assertIn('name', c) + self.assertIn('params', c) + self.assertIsInstance(c['params'], dict) + self.assertEqual(c['name'], 'test_thing') + self.assertEqual(c['params'], {'book': 42}) + + with self.assertRaises(pipeline_manager.PipelineConfigError): + c = pipeline_manager.Pipeline.check_handler_config("other_thing", handler_map) + + with self.assertRaises(pipeline_manager.PipelineConfigError): + conf = dict(params={'book': 42}) + c = pipeline_manager.Pipeline.check_handler_config(conf, handler_map) + + def test_init(self): + conf = [dict(name='test_thing', params={'book': 42})] + handler_class = mock.MagicMock() + handler_map = {'test_thing': handler_class} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + self.assertEqual(p.name, "test_pipeline") + self.assertEqual(len(p.handlers), 1) + self.assertIs(handler_class.return_value, p.handlers[0]) + handler_class.assert_called_once_with(book=42) + + def test_handle_events(self): + test_events = [dict(message_id="t000-0001"), + dict(message_id="t000-0002"), + dict(message_id="t000-0003")] + new_events = [dict(message_id="t000-0004")] + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + handler_class3.return_value.handle_events.return_value = test_events + new_events + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.commit = mock.MagicMock(name='commit') + p.rollback = mock.MagicMock(name='rollback') + + ret = p.handle_events(test_events) + handler_class1.return_value.handle_events.assert_called_once_with(test_events, p.env) + events1 = handler_class1.return_value.handle_events.return_value + handler_class2.return_value.handle_events.assert_called_once_with(events1, p.env) + events2 = handler_class2.return_value.handle_events.return_value + handler_class3.return_value.handle_events.assert_called_once_with(events2, p.env) + p.commit.assert_called_once_with() + self.assertFalse(p.rollback.called) + self.assertEqual(ret, new_events) + + def test_handle_events_error(self): + test_events = [dict(message_id="t000-0001"), + dict(message_id="t000-0002"), + dict(message_id="t000-0003")] + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + + class WhackyError(Exception): + pass + + handler_class2.return_value.handle_events.side_effect = WhackyError("whoa!") + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.commit = mock.MagicMock(name='commit') + p.rollback = mock.MagicMock(name='rollback') + + with self.assertRaises(pipeline_manager.PipelineExecutionError): + ret = p.handle_events(test_events) + p.rollback.assert_called_once_with() + self.assertFalse(p.commit.called) + + def test_commit(self): + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.commit() + handler_class1.return_value.commit.assert_called_once_with() + handler_class2.return_value.commit.assert_called_once_with() + handler_class3.return_value.commit.assert_called_once_with() + + def test_commit_with_error(self): + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + + class WhackyError(Exception): + pass + + handler_class2.return_value.commit.side_effect = WhackyError("whoa!") + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.commit() + handler_class1.return_value.commit.assert_called_once_with() + handler_class2.return_value.commit.assert_called_once_with() + handler_class3.return_value.commit.assert_called_once_with() + + def test_rollback(self): + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.rollback() + handler_class1.return_value.rollback.assert_called_once_with() + handler_class2.return_value.rollback.assert_called_once_with() + handler_class3.return_value.rollback.assert_called_once_with() + + def test_rollback_with_error(self): + conf = [dict(name='test_thing', params={}), + dict(name='other_thing', params={}), + dict(name='some_thing', params={})] + handler_class1 = mock.MagicMock(name='handler1') + handler_class2 = mock.MagicMock(name='handler2') + handler_class3 = mock.MagicMock(name='handler3') + + class WhackyError(Exception): + pass + + handler_class2.return_value.rollback.side_effect = WhackyError("whoa!") + + handler_map = {'test_thing': handler_class1, + 'other_thing': handler_class2, + 'some_thing': handler_class3} + p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) + p.rollback() + handler_class1.return_value.rollback.assert_called_once_with() + handler_class2.return_value.rollback.assert_called_once_with() + handler_class3.return_value.rollback.assert_called_once_with() + + +class TestPipelineManager(unittest.TestCase): + + def setUp(self): + super(TestPipelineManager, self).setUp() + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_complete_stream(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db) + stream = "test stream" + pm._complete_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, StreamState.completed) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_error_stream(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db) + stream = "test stream" + pm._error_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, StreamState.error) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_expire_error_stream(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db) + stream = "test stream" + pm._expire_error_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, StreamState.expire_error) + + @mock.patch('winchester.pipeline_manager.Pipeline', autospec=True) + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_run_pipeline(self, mock_config_wrap, mock_pipeline): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db, name='db') + trigger_def = mock.MagicMock(name='trigger_def') + pipeline_name = "test" + pipeline_config = mock.MagicMock(name='pipeline_config') + stream = mock.MagicMock(name='stream') + pm.add_new_events = mock.MagicMock(name='add_nemw_events') + pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') + + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + pm.db.get_stream_events.assert_called_once_with(stream) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + + pipeline = mock_pipeline.return_value + pipeline.handle_events.assert_called_once_with( + pm.db.get_stream_events.return_value) + pm.add_new_events.assert_called_once_with( + mock_pipeline.return_value.handle_events.return_value) + self.assertTrue(ret) + + @mock.patch('winchester.pipeline_manager.Pipeline', autospec=True) + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_run_pipeline_with_error(self, mock_config_wrap, mock_pipeline): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db, name='db') + trigger_def = mock.MagicMock(name='trigger_def') + pipeline_name = "test" + pipeline_config = mock.MagicMock(name='pipeline_config') + stream = mock.MagicMock(name='stream') + pm.add_new_events = mock.MagicMock(name='add_nemw_events') + pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') + pipeline = mock_pipeline.return_value + pipeline.handle_events.side_effect = pipeline_manager.PipelineExecutionError('test', 'thing') + + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + + pm.db.get_stream_events.assert_called_once_with(stream) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + + pipeline.handle_events.assert_called_once_with( + pm.db.get_stream_events.return_value) + self.assertFalse(pm.add_new_events.called) + self.assertFalse(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_fire_stream(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.fire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._error_stream = mock.MagicMock(name='_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.fire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + self.assertFalse(pm._error_stream.called) + pm._complete_stream.assert_called_once_with(stream) + self.assertTrue(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_fire_stream_locked(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.side_effect = winch_db.LockError('locked!') + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.fire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._error_stream = mock.MagicMock(name='_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, StreamState.firing) + self.assertFalse(pm._run_pipeline.called) + self.assertFalse(pm._error_stream.called) + self.assertFalse(pm._complete_stream.called) + self.assertFalse(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_fire_stream_no_pipeline(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.fire_pipeline = None + pm.trigger_map = dict(test=trigger_def) + pm._error_stream = mock.MagicMock(name='_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.fire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) + self.assertFalse(pm._error_stream.called) + self.assertFalse(pm._run_pipeline.called) + pm._complete_stream.assert_called_once_with(stream) + self.assertTrue(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_fire_stream_error(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.fire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._error_stream = mock.MagicMock(name='_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = False + + ret = pm.fire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + self.assertFalse(pm._complete_stream.called) + pm._error_stream.assert_called_once_with(stream) + self.assertFalse(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_expire_stream(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.expire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._error_stream = mock.MagicMock(name='_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.expire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + self.assertFalse(pm._error_stream.called) + pm._complete_stream.assert_called_once_with(stream) + self.assertTrue(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_expire_stream_locked(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.side_effect = winch_db.LockError('locked!') + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.expire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, StreamState.expiring) + self.assertFalse(pm._run_pipeline.called) + self.assertFalse(pm._expire_error_stream.called) + self.assertFalse(pm._complete_stream.called) + self.assertFalse(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_expire_stream_no_pipeline(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.expire_pipeline = None + pm.trigger_map = dict(test=trigger_def) + pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = True + + ret = pm.expire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) + self.assertFalse(pm._expire_error_stream.called) + self.assertFalse(pm._run_pipeline.called) + pm._complete_stream.assert_called_once_with(stream) + self.assertTrue(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_expire_stream_error(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + stream = mock.MagicMock(name='stream') + stream.name = 'test' + pm.db = mock.MagicMock(spec=pm.db, name='db') + pm.db.set_stream_state.return_value = stream + trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.expire_pipeline = 'test_fire_pipeline' + pm.trigger_map = dict(test=trigger_def) + pipeline_config = mock.MagicMock(name='pipeline_config') + pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) + pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') + pm._complete_stream = mock.MagicMock(name='_complete_stream') + pm._run_pipeline = mock.MagicMock(name='_run_pipeline') + pm._run_pipeline.return_value = False + + ret = pm.expire_stream("test stream") + pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + self.assertFalse(pm._complete_stream.called) + pm._expire_error_stream.assert_called_once_with(stream) + self.assertFalse(ret) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_process_ready_streams_fire(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db, name='db') + stream = mock.MagicMock(name='stream') + pm.expire_stream = mock.MagicMock(name='expire_stream') + pm.fire_stream = mock.MagicMock(name='fire_stream') + pm.current_time = mock.MagicMock(name='current_time') + pm.db.get_ready_streams.return_value = [stream] + + ret = pm.process_ready_streams(42) + pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=False) + pm.fire_stream.assert_called_once_with(stream) + self.assertFalse(pm.expire_stream.called) + self.assertEqual(ret, 1) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_process_ready_streams_expire(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db, name='db') + stream = mock.MagicMock(name='stream') + pm.expire_stream = mock.MagicMock(name='expire_stream') + pm.fire_stream = mock.MagicMock(name='fire_stream') + pm.current_time = mock.MagicMock(name='current_time') + pm.db.get_ready_streams.return_value = [stream] + + ret = pm.process_ready_streams(42, expire=True) + pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=True) + pm.expire_stream.assert_called_once_with(stream) + self.assertFalse(pm.fire_stream.called) + self.assertEqual(ret, 1) diff --git a/tests/test_trigger_manager.py b/tests/test_trigger_manager.py new file mode 100644 index 0000000..7d50dda --- /dev/null +++ b/tests/test_trigger_manager.py @@ -0,0 +1,253 @@ +import unittest2 as unittest + +import mock + +import datetime +import timex + +from winchester import trigger_manager +from winchester import definition +from winchester import db as winch_db + +class TestTriggerManager(unittest.TestCase): + + def setUp(self): + super(TestTriggerManager, self).setUp() + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_save_event(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + event = dict(message_id='1234-test-5678', + timestamp=datetime.datetime(2014,8,1,10,9,8,77777), + event_type='test.thing', + test_trait="foobar", + other_test_trait=42) + self.assertTrue(tm.save_event(event)) + tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', + datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_save_event_dup(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.db.create_event.side_effect = winch_db.DuplicateError("test boom!") + event = dict(message_id='1234-test-5678', + timestamp=datetime.datetime(2014,8,1,10,9,8,77777), + event_type='test.thing', + test_trait="foobar", + other_test_trait=42) + self.assertFalse(tm.save_event(event)) + tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', + datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + + @mock.patch('winchester.trigger_manager.EventCondenser', autospec=True) + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_convert_notification(self, mock_config_wrap, mock_condenser): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.distiller = mock.MagicMock(spec=tm.distiller) + test_event = "I'm a test event!" + tm.distiller.to_event.return_value = True + cond = mock_condenser.return_value + cond.validate.return_value = True + cond.get_event.return_value = test_event + tm.save_event = mock.MagicMock() + tm.save_event.return_value = True + + res = tm.convert_notification('test notification here') + mock_condenser.assert_called_once_with(tm.db) + cond.clear.assert_called_once_with() + cond.validate.assert_called_once_with() + tm.distiller.to_event.assert_called_once_with('test notification here', cond) + tm.save_event.assert_called_once_with(test_event) + self.assertEquals(res, test_event) + + @mock.patch('winchester.trigger_manager.EventCondenser', autospec=True) + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_convert_notification_dropped(self, mock_config_wrap, mock_condenser): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.distiller = mock.MagicMock(spec=tm.distiller) + test_event = "I'm a test event!" + tm.distiller.to_event.return_value = False + cond = mock_condenser.return_value + cond.validate.return_value = True + cond.get_event.return_value = test_event + tm.save_event = mock.MagicMock() + tm.save_event.return_value = True + + test_notif = dict(event_type='test.notification.here', message_id='4242-4242') + res = tm.convert_notification(test_notif) + mock_condenser.assert_called_once_with(tm.db) + cond.clear.assert_called_once_with() + self.assertFalse(cond.validate.called) + tm.distiller.to_event.assert_called_once_with(test_notif, cond) + self.assertFalse(tm.save_event.called) + self.assertIsNone(res) + + @mock.patch('winchester.trigger_manager.EventCondenser', autospec=True) + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_convert_notification_invalid(self, mock_config_wrap, mock_condenser): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.distiller = mock.MagicMock(spec=tm.distiller) + test_event = "I'm a test event!" + tm.distiller.to_event.return_value = True + cond = mock_condenser.return_value + cond.validate.return_value = False + cond.get_event.return_value = test_event + tm.save_event = mock.MagicMock() + tm.save_event.return_value = True + + test_notif = dict(event_type='test.notification.here', message_id='4242-4242') + res = tm.convert_notification(test_notif) + mock_condenser.assert_called_once_with(tm.db) + cond.clear.assert_called_once_with() + cond.validate.assert_called_once_with() + tm.distiller.to_event.assert_called_once_with(test_notif, cond) + self.assertFalse(tm.save_event.called) + self.assertIsNone(res) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_or_create_stream(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.db.get_active_stream.return_value = 'Existing Stream' + tm.current_time = mock.MagicMock() + trigger_def = mock.MagicMock() + dist_traits = 'some traits' + event = "eventful!" + + ret = tm._add_or_create_stream(trigger_def, event, dist_traits) + tm.db.get_active_stream.assert_called_once_with(trigger_def.name, dist_traits, + tm.current_time.return_value) + self.assertFalse(tm.db.create_stream.called) + tm.db.add_event_stream.assert_called_once_with(tm.db.get_active_stream.return_value, + event, trigger_def.expiration) + self.assertEqual(ret, tm.db.get_active_stream.return_value) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_or_create_stream_create(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.db.get_active_stream.return_value = None + tm.current_time = mock.MagicMock() + trigger_def = mock.MagicMock() + dist_traits = 'some traits' + event = "eventful!" + + ret = tm._add_or_create_stream(trigger_def, event, dist_traits) + tm.db.get_active_stream.assert_called_once_with(trigger_def.name, dist_traits, + tm.current_time.return_value) + tm.db.create_stream.assert_called_once_with(trigger_def.name, event, dist_traits, + trigger_def.expiration) + self.assertFalse(tm.db.add_event_stream.called) + self.assertEqual(ret, tm.db.create_stream.return_value) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_ready_to_fire(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.current_time = mock.MagicMock() + trigger_def = mock.MagicMock() + test_stream = mock.MagicMock() + + tm._ready_to_fire(test_stream, trigger_def) + trigger_def.get_fire_timestamp.assert_called_once_with(tm.current_time.return_value) + tm.db.stream_ready_to_fire.assert_called_once_with(test_stream, + trigger_def.get_fire_timestamp.return_value) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_notification(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.convert_notification = mock.MagicMock() + tm.add_event = mock.MagicMock() + + tm.add_notification("test notification") + tm.convert_notification.assert_called_once_with("test notification") + tm.add_event.assert_called_once_with(tm.convert_notification.return_value) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_notification_invalid_or_dropped(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.convert_notification = mock.MagicMock() + tm.add_event = mock.MagicMock() + tm.convert_notification.return_value = None + + tm.add_notification("test notification") + tm.convert_notification.assert_called_once_with("test notification") + self.assertFalse(tm.add_event.called) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_event(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.trigger_definitions = [mock.MagicMock() for n in range(3)] + m_def = tm.trigger_definitions[2] + tm.trigger_definitions[0].match.return_value = None + tm.trigger_definitions[1].match.return_value = None + event = "test event" + tm._add_or_create_stream = mock.MagicMock() + tm._add_or_create_stream.return_value.fire_timestamp = None + tm._ready_to_fire = mock.MagicMock() + m_def.should_fire.return_value = True + + tm.add_event(event) + for td in tm.trigger_definitions: + td.match.assert_called_once_with(event) + m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + tm._add_or_create_stream.assert_called_once_with(m_def, event, + m_def.get_distinguishing_traits.return_value) + tm.db.get_stream_events.assert_called_once_with(tm._add_or_create_stream.return_value) + m_def.should_fire.assert_called_once_with(tm.db.get_stream_events.return_value) + tm._ready_to_fire.assert_called_once_with(tm._add_or_create_stream.return_value, m_def) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_event_on_ready_stream(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.trigger_definitions = [mock.MagicMock() for n in range(3)] + m_def = tm.trigger_definitions[2] + tm.trigger_definitions[0].match.return_value = None + tm.trigger_definitions[1].match.return_value = None + event = "test event" + tm._add_or_create_stream = mock.MagicMock() + tm._add_or_create_stream.return_value.fire_timestamp = "Fire!" + tm._ready_to_fire = mock.MagicMock() + m_def.should_fire.return_value = True + + tm.add_event(event) + for td in tm.trigger_definitions: + td.match.assert_called_once_with(event) + m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + tm._add_or_create_stream.assert_called_once_with(m_def, event, + m_def.get_distinguishing_traits.return_value) + self.assertFalse(tm.db.get_stream_events.called) + self.assertFalse(m_def.should_fire.called) + self.assertFalse(tm._ready_to_fire.called) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add_event_no_match(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + tm.trigger_definitions = [mock.MagicMock() for n in range(3)] + tm.trigger_definitions[0].match.return_value = None + tm.trigger_definitions[1].match.return_value = None + tm.trigger_definitions[2].match.return_value = None + event = "test event" + tm._add_or_create_stream = mock.MagicMock() + tm._add_or_create_stream.return_value.fire_timestamp = "Fire!" + tm._ready_to_fire = mock.MagicMock() + + tm.add_event(event) + for td in tm.trigger_definitions: + td.match.assert_called_once_with(event) + for td in tm.trigger_definitions: + self.assertFalse(td.get_distinguishing_traits.called) + self.assertFalse(td.should_fire.called) + self.assertFalse(tm._add_or_create_stream.called) + self.assertFalse(tm.db.get_stream_events.called) + self.assertFalse(tm._ready_to_fire.called) + + diff --git a/winchester/config.py b/winchester/config.py new file mode 100644 index 0000000..4fbebc9 --- /dev/null +++ b/winchester/config.py @@ -0,0 +1,171 @@ +import collections +import logging +import os +import yaml + +logger = logging.getLogger(__name__) + + +class ConfigurationError(Exception): + pass + + +class ConfigItem(object): + def __init__(self, required=False, default=None, help='', multiple=False): + self.help = help + self.required = required + self.multiple = multiple + self.default = self.convert(default) + + def convert(self, item, manager=None): + if not self.multiple: + return item + elif (isinstance(item, collections.Sequence) + and not isinstance(item, basestring)): + return item + else: + return [item] + + +class ConfigSection(collections.Mapping): + def __init__(self, required=True, help='', config_description=None): + self.config_description = config_description + self.help = help + self.required = required + self.default = None + + def convert(self, item, manager): + return manager.wrap(item, self.config_description) + + def __len__(self): + return len(self.config_description) + + def __iter__(self): + return iter(self.config_description) + + def __getitem__(self, key): + return self.config_description[key] + + +class ConfigManager(collections.Mapping): + + @classmethod + def wrap(cls, conf, config_description): + if hasattr(conf, 'check_config'): + wrapped_conf = conf + else: + wrapped_conf = cls(conf, config_description) + return wrapped_conf + + def __init__(self, config_dict, config_description): + self.config_paths = [] + self._configs = dict() + self._description = config_description + self._required = set() + self._defaults = dict() + for k, item in self._description.items(): + if item.required: + self._required.add(k) + if item.default is not None: + self._defaults[k] = item.default + for k, item in config_dict.items(): + if k in self._description: + self._configs[k] = self._description[k].convert(item, self) + else: + self._configs[k] = item + self._keys = set(self._defaults.keys() + self._configs.keys()) + + def __len__(self): + return len(self._keys) + + def __iter__(self): + return iter(self._keys) + + def __getitem__(self, key): + if key in self._configs: + return self._configs[key] + if key in self._defaults: + return self._defaults[key] + raise KeyError(key) + + def add_config_path(self, *args): + for path in args: + if path not in self.config_paths: + self.config_paths.append(path) + + def check_config(self, prefix=''): + if prefix: + prefix = prefix + '/' + for r in self._required: + if r not in self: + msg = "Required Configuration setting %s%s is missing!" % (prefix,r) + logger.error(msg) + raise ConfigurationError(msg) + for k, item in self.items(): + if hasattr(item, 'check_config'): + item.check_config(prefix="%s%s" % (prefix,k)) + + @classmethod + def _load_yaml_config(cls, config_data, filename="(unknown)"): + """Load a yaml config file.""" + + try: + config = yaml.safe_load(config_data) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ("Invalid YAML syntax in Configuration file " + "%(file)s at line: %(line)s, column: %(column)s." + % dict(file=filename, + line=mark.line + 1, + column=mark.column + 1)) + else: + errmsg = ("YAML error reading Configuration file " + "%(file)s" + % dict(file=filename)) + logger.error(errmsg) + raise + + logger.info("Configuration: %s", config) + return config + + @classmethod + def _load_file(cls, filename, paths): + for path in paths: + fullpath = os.path.join(path, filename) + if os.path.isfile(fullpath): + with open(fullpath, 'r') as cf: + logger.debug("Loading configuration file: %s", fullpath) + return cf.read() + msg = "Unable to find file %s in %s" % (filename, str(paths)) + logger.info(msg) + return None + + @classmethod + def load_config_file(cls, filename, filetype=None, paths=None): + if not paths: + paths = ['.'] + if filetype is None: + if (filename.lower().endswith('.yaml') or + filename.lower().endswith('.yml')): + filetype = 'yaml' + elif filename.lower().endswith('.json'): + filetype = 'json' + elif (filename.lower().endswith('.conf') or + filename.lower().endswith('.ini')): + filetype = 'ini' + else: + filetype = 'yaml' + data = cls._load_file(filename, paths) + if data is None: + raise ConfigurationError("Cannot find or read config file: %s" % filename) + try: + loader = getattr(cls, "_load_%s_config" % filetype) + except AttributeError: + raise ConfigurationError("Unknown config file type: %s" % filetype) + return loader(data, filename=filename) + + def load_file(self, filename, filetype=None): + return self.load_config_file(filename, filetype, paths=self.config_paths) + + diff --git a/winchester/db.py b/winchester/db.py index fe1d7fa..0fd75bc 100644 --- a/winchester/db.py +++ b/winchester/db.py @@ -1,43 +1,59 @@ +from contextlib import contextmanager +import logging import sqlalchemy +from sqlalchemy import and_, or_ +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker from winchester import models +from winchester.config import ConfigManager, ConfigSection, ConfigItem +logger = logging.getLogger(__name__) + ENGINES = dict() SESSIONMAKERS = dict() +class DuplicateError(models.DBException): + pass + + +class LockError(models.DBException): + pass + + def sessioned(func): def with_session(self, *args, **kw): if 'session' in kw: return func(self, *args, **kw) else: - try: - session = self.get_session() + with self.in_session() as session: kw['session'] = session retval = func(self, *args, **kw) - session.commit() - return retval - except: - session.rollback() - raise - finally: - session.close() + return retval return with_session class DBInterface(object): + + @classmethod + def config_description(cls): + return dict(url=ConfigItem(required=True, + help="Connection URL for database."), + ) + def __init__(self, config): - self.config = config + self.config = ConfigManager.wrap(config, self.config_description()) self.db_url = config['url'] + self.echo_sql = config.get('echo_sql', False) @property def engine(self): global ENGINES if self.db_url not in ENGINES: - engine = sqlalchemy.create_engine(self.db_url) + engine = sqlalchemy.create_engine(self.db_url, echo=self.echo_sql) ENGINES[self.db_url] = engine return ENGINES[self.db_url] @@ -49,9 +65,31 @@ class DBInterface(object): SESSIONMAKERS[self.db_url] = maker return SESSIONMAKERS[self.db_url] + def close(self): + if self.db_url in ENGINES: + del ENGINES[self.db_url] + if self.db_url in SESSIONMAKERS: + del SESSIONMAKERS[self.db_url] + def get_session(self): return self.sessionmaker(expire_on_commit=False) + @contextmanager + def in_session(self): + """Provide a session scope around a series of operations.""" + session = self.get_session() + try: + yield session + session.commit() + except IntegrityError: + session.rollback() + raise DuplicateError("Duplicate unique value detected!") + except: + session.rollback() + raise + finally: + session.close() + @sessioned def get_event_type(self, description, session=None): t = session.query(models.EventType).filter(models.EventType.desc == description).first() @@ -67,4 +105,103 @@ class DBInterface(object): for name in traits: e[name] = traits[name] session.add(e) + + @sessioned + def get_event_by_message_id(self, message_id, session=None): + e = session.query(models.Event).\ + filter(models.Event.message_id == message_id).one() + return e.as_dict + + @sessioned + def get_stream_by_id(self, stream_id, session=None): + s = session.query(models.Stream).\ + filter(models.Stream.id == stream_id).one() + return s + + @sessioned + def create_stream(self, trigger_name, initial_event, dist_traits, expire_expr, session=None): + first_event_time = initial_event['timestamp'] + s = models.Stream(trigger_name, first_event_time) + for trait_name in dist_traits: + s[trait_name] = dist_traits[trait_name] + session.add(s) + self.add_event_stream(s, initial_event, expire_expr, session=session) + return s + + @sessioned + def stream_has_dist_trait(self, stream_id, name, value=None, session=None): + q = session.query(models.DistinguishingTrait) + q = q.filter(models.DistinguishingTrait.stream_id == stream_id) + q = q.filter(models.DistinguishingTrait.name == name) + if value is not None: + q = q.filter(models.DistinguishingTrait.value == value) + dt = q.first() + if dt is not None: + dt = dt.as_dict + return dt + + @sessioned + def get_stream_events(self, stream, session=None): + if stream not in session: + stream = session.merge(stream) + return [event.as_dict for event in stream.events] + + @sessioned + def add_event_stream(self, stream, event, expire_expr, session=None): + if stream not in session: + session.add(stream) + message_id = event['message_id'] + timestamp = event['timestamp'] + if timestamp < stream.first_event: + stream.first_event = timestamp + if timestamp > stream.last_event: + stream.last_event = timestamp + stream.expire_timestamp = expire_expr(first=stream.first_event, + last=stream.last_event).timestamp + eq = session.query(models.Event) + eq = eq.filter(models.Event.message_id == message_id) + e = eq.one() + stream.events.append(e) return e + + @sessioned + def get_active_stream(self, name, dist_traits, current_time, session=None): + q = session.query(models.Stream) + q = q.filter(models.Stream.name == name) + q = q.filter(models.Stream.state == int(models.StreamState.active)) + q = q.filter(models.Stream.expire_timestamp > current_time) + for name, val in dist_traits.items(): + q = q.filter(models.Stream.distinguished_by.any(and_( + models.DistinguishingTrait.name == name, + models.DistinguishingTrait.value == val))) + return q.first() + + @sessioned + def stream_ready_to_fire(self, stream, timestamp, session=None): + if stream not in session: + session.add(stream) + stream.fire_timestamp = timestamp + + @sessioned + def get_ready_streams(self, batch_size, current_time, expire=False, session=None): + q = session.query(models.Stream) + q = q.filter(models.Stream.state == int(models.StreamState.active)) + if expire: + q = q.filter(models.Stream.expire_timestamp < current_time) + else: + q = q.filter(models.Stream.fire_timestamp < current_time) + q = q.limit(batch_size) + return q.all() + + def set_stream_state(self, stream, state): + serial = stream.state_serial_no + stream_id = stream.id + #we do this in a separate session, as it needs to be atomic. + with self.in_session() as session: + q = session.query(models.Stream) + q = q.filter(models.Stream.id == stream_id) + q = q.filter(models.Stream.state_serial_no == serial) + ct = q.update(dict(state=int(state), state_serial_no=serial + 1)) + if ct != 1: + raise LockError("Optimistic Lock failed!") + return self.get_stream_by_id(stream_id) diff --git a/winchester/definition.py b/winchester/definition.py new file mode 100644 index 0000000..c83de23 --- /dev/null +++ b/winchester/definition.py @@ -0,0 +1,250 @@ +import logging +import collections +import datetime +import six +import timex +import fnmatch + + +logger = logging.getLogger(__name__) + + +class DefinitionError(Exception): + pass + + +def filter_event_timestamps(event): + return dict((trait, value) for trait, value in event.items() + if isinstance(value, datetime.datetime)) + + +class Criterion(object): + + @classmethod + def get_from_expression(cls, expression, trait_name): + if isinstance(expression, collections.Mapping): + if len(expression) != 1: + raise DefinitionError("Only exactly one type of match is allowed per criterion expression") + ctype = expression.keys()[0] + expr = expression[ctype] + if ctype == 'int': + return NumericCriterion(expr, trait_name) + elif ctype =='float': + return FloatCriterion(expr, trait_name) + elif ctype == 'datetime': + return TimeCriterion(expr, trait_name) + elif ctype == 'string' or ctype == 'text': + return Criterion(expr, trait_name) + else: + # A constant. -mdragon + return Criterion(expression, trait_name) + + def __init__(self, expr, trait_name): + self.trait_name = trait_name + #match a constant + self.op = '=' + self.value = expr + + def match(self, event): + if self.trait_name not in event: + return False + value = event[self.trait_name] + if self.op == '=': + return value == self.value + elif self.op == '>': + return value > self.value + elif self.op == '<': + return value < self.value + + +class NumericCriterion(Criterion): + + def __init__(self, expr, trait_name): + self.trait_name = trait_name + if not isinstance(expr, six.string_types): + self.op = '=' + self.value = expr + else: + expr = expr.strip().split(None, 1) + if len(expr) == 2: + self.op = expr[0] + value = expr[1].strip() + elif len(expr) == 1: + self.op = '=' + value = expr[0] + else: + raise DefinitionError('Invalid numeric criterion.') + try: + self.value = self._convert(value) + except ValueError: + raise DefinitionError('Invalid numeric criterion.') + + def _convert(self, value): + return int(value) + + +class FloatCriterion(NumericCriterion): + + def _convert(self, value): + return float(value) + + +class TimeCriterion(Criterion): + + def __init__(self, expression, trait_name): + self.trait_name = trait_name + self.time_expr = timex.parse(expression) + + def match(self, event): + if self.trait_name not in event: + return False + value = event[self.trait_name] + try: + timerange = self.time_expr(**filter_event_timestamps(event)) + except timex.TimexExpressionError: + # the event doesn't contain a trait referenced in the expression. + return False + return value in timerange + + +class Criteria(object): + def __init__(self, config): + self.included_types = [] + self.excluded_types = [] + if 'event_type' in config: + event_types = config['event_type'] + if isinstance(event_types, six.string_types): + event_types = [event_types] + for t in event_types: + if t.startswith('!'): + self.excluded_types.append(t[1:]) + else: + self.included_types.append(t) + else: + self.included_types.append('*') + if self.excluded_types and not self.included_types: + self.included_types.append('*') + if 'number' in config: + self.number = config['number'] + else: + self.number = 1 + if 'timestamp' in config: + self.timestamp = timex.parse(config['timestamp']) + else: + self.timestamp = None + self.map_distinguished_by = dict() + if 'map_distinguished_by' in config: + self.map_distinguished_by = config['map_distinguished_by'] + self.traits = dict() + if 'traits' in config: + for trait, criterion in config['traits'].items(): + self.traits[trait] = Criterion.get_from_expression(criterion, trait) + + def included_type(self, event_type): + return any(fnmatch.fnmatch(event_type, t) for t in self.included_types) + + def excluded_type(self, event_type): + return any(fnmatch.fnmatch(event_type, t) for t in self.excluded_types) + + def match_type(self, event_type): + return (self.included_type(event_type) + and not self.excluded_type(event_type)) + + def match(self, event): + if not self.match_type(event['event_type']): + return False + if self.timestamp: + try: + t = self.timestamp(**filter_event_timestamps(event)) + except timex.TimexExpressionError: + # the event doesn't contain a trait referenced in the expression. + return False + if event['timestamp'] not in t: + return False + if not self.traits: + return True + return all(criterion.match(event) for + criterion in self.traits.values()) + + +class TriggerDefinition(object): + + def __init__(self, config): + if 'name' not in config: + raise DefinitionError("Required field in trigger definition not " + "specified 'name'") + self.name = config['name'] + self.distinguished_by = config.get('distinguished_by', []) + for dt in self.distinguished_by: + if isinstance(dt, collections.Mapping): + if len(dt) > 1: + raise DefinitionError("Invalid distinguising expression " + "%s. Only one trait allowed in an expression" % str(dt)) + self.fire_delay = config.get('fire_delay', 0) + if 'expiration' not in config: + raise DefinitionError("Required field in trigger definition not " + "specified 'expiration'") + self.expiration = timex.parse(config['expiration']) + self.fire_pipeline = config.get('fire_pipeline') + self.expire_pipeline = config.get('expire_pipeline') + if not self.fire_pipeline and not self.expire_pipeline: + raise DefinitionError("At least one of: 'fire_pipeline' or " + "'expire_pipeline' must be specified in a " + "trigger definition.") + if 'fire_criteria' not in config: + raise DefinitionError("Required criteria in trigger definition not " + "specified 'fire_criteria'") + self.fire_criteria = [Criteria(c) for c in config['fire_criteria']] + if 'match_criteria' not in config: + raise DefinitionError("Required criteria in trigger definition not " + "specified 'match_criteria'") + self.match_criteria = [Criteria(c) for c in config['match_criteria']] + self.load_criteria = [] + if 'load_criteria' in config: + self.load_criteria = [Criteria(c) for c in config['load_criteria']] + + def match(self, event): + # all distinguishing traits must exist to match. + for dt in self.distinguished_by: + if isinstance(dt, collections.Mapping): + trait_name = dt.keys()[0] + else: + trait_name = dt + if trait_name not in event: + return None + for criteria in self.match_criteria: + if criteria.match(event): + return criteria + return None + + def get_distinguishing_traits(self, event, matching_criteria): + dist_traits = dict() + for dt in self.distinguished_by: + d_expr = None + if isinstance(dt, collections.Mapping): + trait_name = dt.keys()[0] + d_expr = timex.parse(dt[trait_name]) + else: + trait_name = dt + event_trait_name = matching_criteria.map_distinguished_by.get(trait_name, trait_name) + if d_expr is not None: + dist_traits[trait_name] = d_expr(timestamp=event[event_trait_name]) + else: + dist_traits[trait_name] = event[event_trait_name] + return dist_traits + + def get_fire_timestamp(self, timestamp): + return timestamp + datetime.timedelta(seconds=self.fire_delay) + + def should_fire(self, events): + for criteria in self.fire_criteria: + matches = 0 + for event in events: + if criteria.match(event): + matches += 1 + if matches >= criteria.number: + break + if matches < criteria.number: + return False + return True + diff --git a/winchester/models.py b/winchester/models.py index d5b6b3f..d29c482 100644 --- a/winchester/models.py +++ b/winchester/models.py @@ -1,8 +1,12 @@ from datetime import datetime - +from decimal import Decimal +import calendar from enum import IntEnum +import timex + from sqlalchemy import event +from sqlalchemy import and_, or_ from sqlalchemy import literal_column from sqlalchemy import Column, Table, ForeignKey, Index, UniqueConstraint from sqlalchemy import Float, Boolean, Text, DateTime, Integer, String @@ -12,6 +16,7 @@ from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.dialects.mysql import DECIMAL from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.associationproxy import association_proxy +from sqlalchemy.orm import composite from sqlalchemy.orm import backref from sqlalchemy.orm import relationship from sqlalchemy.orm.collections import attribute_mapped_collection @@ -24,6 +29,33 @@ class Datatype(IntEnum): int = 2 float = 3 datetime = 4 + timerange = 5 + + +class StreamState(IntEnum): + active = 1 + firing = 2 + expiring = 3 + error = 4 + expire_error = 5 + completed = 6 + + +class DBException(Exception): + pass + + +class InvalidTraitType(DBException): + pass + + +def dt_to_decimal(dt): + t_sec = calendar.timegm(dt.utctimetuple()) + (dt.microsecond/1e6) + return Decimal("%.6f" % t_sec) + + +def decimal_to_dt(decimal_timestamp): + return datetime.utcfromtimestamp(float(decimal_timestamp)) class PreciseTimestamp(TypeDecorator): @@ -42,17 +74,37 @@ class PreciseTimestamp(TypeDecorator): if value is None: return value elif dialect.name == 'mysql': - return utils.dt_to_decimal(value) + return dt_to_decimal(value) return value def process_result_value(self, value, dialect): if value is None: return value elif dialect.name == 'mysql': - return utils.decimal_to_dt(value) + return decimal_to_dt(value) return value +class DBTimeRange(object): + def __init__(self, begin, end): + self.begin = begin + self.end = end + + def __composite_values__(self): + return self.begin, self.end + + def __repr__(self): + return "DBTimeRange(begin=%r, end=%r)" % (self.begin, self.end) + + def __eq__(self, other): + return isinstance(other, DBTimeRange) and \ + other.begin == self.begin and \ + other.end == self.end + + def __ne__(self, other): + return not self.__eq__(other) + + class ProxiedDictMixin(object): """Adds obj[name] access to a mapped class. @@ -84,26 +136,53 @@ class ProxiedDictMixin(object): class PolymorphicVerticalProperty(object): """A name/value pair with polymorphic value storage.""" + ATTRIBUTE_MAP = {Datatype.none: None} + PY_TYPE_MAP = {unicode: Datatype.string, + int: Datatype.int, + float: Datatype.float, + datetime: Datatype.datetime, + DBTimeRange: Datatype.timerange} + def __init__(self, name, value=None): self.name = name self.value = value + @classmethod + def get_type_value(cls, value): + if value is None: + return Datatype.none, None + if isinstance(value, str): + value = value.decode('utf8', 'ignore') + if isinstance(value, timex.Timestamp): + value = value.timestamp + if isinstance(value, timex.TimeRange): + value = DBTimeRange(value.begin, value.end) + if type(value) in cls.PY_TYPE_MAP: + return cls.PY_TYPE_MAP[type(value)], value + return None, value + @hybrid_property def value(self): - fieldname, discriminator = self.type_map[self.type] - if fieldname is None: + if self.type not in self.ATTRIBUTE_MAP: + raise InvalidTraitType("Invalid trait type in db for %s: %s" % (self.name, self.type)) + attribute = self.ATTRIBUTE_MAP[self.type] + if attribute is None: return None + if self.type == Datatype.timerange: + val = getattr(self, attribute) + return timex.TimeRange(val.begin, val.end) else: - return getattr(self, fieldname) + return getattr(self, attribute) @value.setter def value(self, value): - py_type = type(value) - fieldname, discriminator = self.type_map[py_type] - - self.type = discriminator - if fieldname is not None: - setattr(self, fieldname, value) + datatype, value = self.get_type_value(value) + if datatype not in self.ATTRIBUTE_MAP: + raise InvalidTraitType("Invalid trait type for %s: %s" % (self.name, datatype)) + attribute = self.ATTRIBUTE_MAP[datatype] + self.type = int(datatype) + if attribute is not None: + setattr(self, attribute, value) @value.deleter def value(self): @@ -111,49 +190,34 @@ class PolymorphicVerticalProperty(object): @value.comparator class value(PropComparator): - """A comparator for .value, builds a polymorphic comparison via CASE. + """A comparator for .value, builds a polymorphic comparison. """ def __init__(self, cls): self.cls = cls - def _case(self): - pairs = set(self.cls.type_map.values()) - whens = [ - ( - literal_column("'%s'" % discriminator), - cast(getattr(self.cls, attribute), String) - ) for attribute, discriminator in pairs - if attribute is not None - ] - return case(whens, self.cls.type, null()) def __eq__(self, other): - return self._case() == cast(other, String) + dtype, value = self.cls.get_type_value(other) + if dtype is None: + dtype = Datatype.string + if dtype == Datatype.none: + return self.cls.type == int(Datatype.none) + attr = getattr(self.cls, self.cls.ATTRIBUTE_MAP[dtype]) + return and_(attr == value, self.cls.type == int(dtype)) + def __ne__(self, other): - return self._case() != cast(other, String) + dtype, value = self.cls.get_type_value(other) + if dtype is None: + dtype = Datatype.string + if dtype == Datatype.none: + return self.cls.type != int(Datatype.none) + attr = getattr(self.cls, self.cls.ATTRIBUTE_MAP[dtype]) + return and_(attr != value, self.cls.type == int(dtype)) def __repr__(self): return '<%s %r=%r>' % (self.__class__.__name__, self.name, self.value) -@event.listens_for(PolymorphicVerticalProperty, "mapper_configured", propagate=True) -def on_new_class(mapper, cls_): - """Add type lookup info for polymorphic value columns. - """ - - info_dict = {} - info_dict[type(None)] = (None, Datatype.none) - info_dict[Datatype.none] = (None, Datatype.none) - - for k in mapper.c.keys(): - col = mapper.c[k] - if 'type' in col.info: - python_type, discriminator = col.info['type'] - info_dict[python_type] = (k, discriminator) - info_dict[discriminator] = (k, discriminator) - cls_.type_map = info_dict - - Base = declarative_base() @@ -169,14 +233,16 @@ class Trait(PolymorphicVerticalProperty, Base): name = Column(String(100), primary_key=True) type = Column(Integer) + ATTRIBUTE_MAP = {Datatype.none: None, + Datatype.string: 't_string', + Datatype.int: 't_int', + Datatype.float: 't_float', + Datatype.datetime: 't_datetime',} - t_string = Column(String(255), info=dict(type=(str, Datatype.string)), - nullable=True, default=None) - t_float = Column(Float, info=dict(type=(float, Datatype.float)), - nullable=True, default=None) - t_int = Column(Integer, info=dict(type=(int, Datatype.int)), - nullable=True, default=None) - t_datetime = Column(PreciseTimestamp(), info=dict(type=(datetime, Datatype.datetime)), + t_string = Column(String(255), nullable=True, default=None) + t_float = Column(Float, nullable=True, default=None) + t_int = Column(Integer, nullable=True, default=None) + t_datetime = Column(PreciseTimestamp(), nullable=True, default=None) def __repr__(self): @@ -222,16 +288,130 @@ class Event(ProxiedDictMixin, Base): _proxied = association_proxy("traits", "value", creator=lambda name, value: Trait(name=name, value=value)) + @property + def event_type_string(self): + return self.event_type.desc + + @property + def as_dict(self): + d = dict(self._proxied) + d['message_id'] = self.message_id + d['event_type'] = self.event_type_string + d['timestamp'] = self.generated + return d + def __init__(self, message_id, event_type, generated): - self.message_id = message_id - self.event_type = event_type - self.generated = generated + self.message_id = message_id + self.event_type = event_type + self.generated = generated def __repr__(self): - return "" % (self.id, + return "" % (self.id, self.message_id, self.event_type, self.generated) +stream_event_table = Table('streamevent', Base.metadata, + Column('stream_id', Integer, ForeignKey('stream.id'), primary_key=True), + Column('event_id', Integer, + ForeignKey('event.id'), + primary_key=True) + ) + + +class Stream(ProxiedDictMixin, Base): + __tablename__ = 'stream' + + __table_args__ = ( + Index('ix_stream_name', 'name'), + Index('ix_stream_state', 'state'), + Index('ix_stream_expire_timestamp', 'expire_timestamp'), + Index('ix_stream_fire_timestamp', 'fire_timestamp') + ) + id = Column(Integer, primary_key=True) + first_event = Column(PreciseTimestamp(), nullable=False) + last_event = Column(PreciseTimestamp(), nullable=False) + expire_timestamp = Column(PreciseTimestamp()) + fire_timestamp = Column(PreciseTimestamp()) + name = Column(String(255), nullable=False) + state = Column(Integer, default=StreamState.active, nullable=False) + state_serial_no = Column(Integer, default=0, nullable=False) + + distinguished_by = relationship("DistinguishingTrait", + collection_class=attribute_mapped_collection('name')) + _proxied = association_proxy("distinguished_by", "value", + creator=lambda name, value: DistinguishingTrait(name=name, value=value)) + + events = relationship(Event, secondary=stream_event_table, + order_by=Event.generated) + + @property + def distinguished_by_dict(self): + return dict(self._proxied) + + def __init__(self, name, first_event, last_event=None, expire_timestamp=None, + fire_timestamp=None, state=None, state_serial_no=None): + self.name = name + self.first_event = first_event + if last_event is None: + last_event = first_event + self.last_event = last_event + self.expire_timestamp = expire_timestamp + self.fire_timestamp = fire_timestamp + if state is None: + state = StreamState.active + self.state = state + if state_serial_no is None: + state_serial_no = 0 + self.state_serial_no = state_serial_no + + +class DistinguishingTrait(PolymorphicVerticalProperty, Base): + __tablename__ = 'dist_trait' + __table_args__ = ( + Index('ix_dist_trait_dt_int', 'dt_int'), + Index('ix_dist_trait_dt_float', 'dt_float'), + Index('ix_dist_trait_dt_string', 'dt_string'), + Index('ix_dist_trait_dt_datetime', 'dt_datetime'), + Index('ix_dist_trait_dt_timerange_begin', 'dt_timerange_begin'), + Index('ix_dist_trait_dt_timerange_end', 'dt_timerange_end'), + ) + stream_id = Column(Integer, ForeignKey('stream.id'), primary_key=True) + name = Column(String(100), primary_key=True) + type = Column(Integer) + + + ATTRIBUTE_MAP = {Datatype.none: None, + Datatype.string: 'dt_string', + Datatype.int: 'dt_int', + Datatype.float: 'dt_float', + Datatype.datetime: 'dt_datetime', + Datatype.timerange:'dt_timerange', + } + + dt_string = Column(String(255), nullable=True, default=None) + dt_float = Column(Float, nullable=True, default=None) + dt_int = Column(Integer, nullable=True, default=None) + dt_datetime = Column(PreciseTimestamp(), + nullable=True, default=None) + dt_timerange_begin = Column(PreciseTimestamp(), nullable=True, default=None) + dt_timerange_end = Column(PreciseTimestamp(), nullable=True, default=None) + + dt_timerange = composite(DBTimeRange, dt_timerange_begin, dt_timerange_end) + + @property + def as_dict(self): + return {self.name: self.value} + + def __repr__(self): + return "" % (self.name, + self.type, + self.dt_string, + self.dt_float, + self.dt_int, + self.dt_datetime, + self.dt_timerange_begin, + self.dt_timerange_end, + self.stream_id) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py new file mode 100644 index 0000000..e934a59 --- /dev/null +++ b/winchester/pipeline_handler.py @@ -0,0 +1,92 @@ +import abc +import logging +import six + + +logger = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class PipelineHandlerBase(object): + """Base class for Pipeline handlers. + + Pipeline handlers perform the actual processing on a set of events + captured by a stream. The handlers are chained together, each handler + in a pipeline is called in order, and receives the output of the previous + handler. + + Once all of the handlers in a pipeline have successfully processed the + events (with .handle_events() ), each handler's .commit() method will be + called. If any handler in the chain raises an exception, processing of + events will stop, and each handler's .rollback() method will be called.""" + + def __init__(self, **kw): + """Setup the pipeline handler. + + A new instance of each handler for a pipeline is used for each + stream (set of events) processed. + + :param kw: The parameters listed in the pipeline config file for + this handler (if any). + """ + + @abc.abstractmethod + def handle_events(self, events, env): + """ This method handles the actual event processing. + + This method receives a list of events and should return a list of + events as well. The return value of this method will be passed to + the next handler's .handle_events() method. Handlers can add new + events simply by adding them to the list they return. New events + (those with unrecognized message_id's), will be saved to the + database if all handlers in this pipeline complete successfully. + Likewise, handlers can omit events from the list they return to + act as a filter for downstream handlers. + + Care should be taken to avoid any operation with side-effects in + this method. Pipelines can be re-tried if a handler throws an + error. If you need to perform such operations, such as interacting + with an external system, save the needed information in an instance + variable, and perform the operation in the .commit() method. + + :param events: A list of events. + :param env: Just a dictionary, it's passed to each handler, and + can act as a shared scratchpad. + + :returns: A list of events. + """ + + @abc.abstractmethod + def commit(self): + """ Called when each handler in this pipeline has successfully + completed. + + If you have operations with side effects, preform them here. + Exceptions raised here will be logged, but otherwise ignored. + """ + + @abc.abstractmethod + def rollback(self): + """ Called if there is an error for any handler while processing a list + of events. + + If you need to perform some kind of cleanup, do it here. + Exceptions raised here will be logged, but otherwise ignored. + """ + + +class LoggingHandler(PipelineHandlerBase): + + def handle_events(self, events, env): + emsg = ', '.join("%s: %s" % (event['event_type'], event['message_id']) + for event in events) + logger.info("Received %s events: \n%s" % (len(events)), emsg) + return events + + def commit(self): + pass + + def rollback(self): + pass + + diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py new file mode 100644 index 0000000..24370ae --- /dev/null +++ b/winchester/pipeline_manager.py @@ -0,0 +1,301 @@ +import datetime +import time +import logging +import random +import simport +import six + +from winchester.db import DBInterface, DuplicateError, LockError +from winchester.config import ConfigManager, ConfigSection, ConfigItem +from winchester.definition import TriggerDefinition +from winchester.models import StreamState + +logger = logging.getLogger(__name__) + + +class PipelineError(Exception): + pass + + +class PipelineExecutionError(PipelineError): + def __init__(self, msg="", cause=None): + super(PipelineExecutionError, self).__init__("%s: caused by %s" % (msg, repr(cause))) + self.cause = cause + + +class PipelineConfigError(PipelineError): + pass + + +class Pipeline(object): + + @classmethod + def check_handler_config(cls, conf, handler_map): + if isinstance(conf, six.string_types): + conf = dict(name=conf, params=dict()) + if 'name' not in conf: + raise PipelineConfigError("Handler name not in config! %s" % str(conf)) + if 'params' not in conf: + conf['params'] = {} + if conf['name'] not in handler_map: + raise PipelineConfigError("Unknown handler in pipeline config %s" % conf['name']) + return conf + + def __init__(self, name, config, handler_map): + self.name = name + self.handlers = [] + self.env = dict() + for handler_conf in config: + name = handler_conf['name'] + params = handler_conf['params'] + handler_class = handler_map[name] + try: + handler = handler_class(**params) + except Exception as e: + logger.exception("Error initalizing handler %s for pipeline %s" % + handler_class, self.name) + raise PipelineExecutionError("Error loading pipeline", e) + self.handlers.append(handler) + + def handle_events(self, events): + event_ids = set(e['message_id'] for e in events) + try: + for handler in self.handlers: + events = handler.handle_events(events, self.env) + except Exception as e: + logger.exception("Error processing pipeline %s" % self.name) + self.rollback() + raise PipelineExecutionError("Error in pipeline", e) + new_events = [e for e in events if e['message_id'] not in event_ids] + self.commit() + return new_events + + def commit(self): + for handler in self.handlers: + try: + handler.commit() + except: + logger.exception("Commit error on handler in pipeline %s" % self.name) + + def rollback(self): + for handler in self.handlers: + try: + handler.rollback() + except: + logger.exception("Rollback error on handler in pipeline %s" % self.name) + + +class PipelineManager(object): + + @classmethod + def config_description(cls): + return dict(config_path=ConfigItem(help="Path(s) to find additional config files", + multiple=True, default='.'), + pipeline_handlers=ConfigItem(required=True, + help="dictionary of pipeline handlers to load " + "Classes specified with simport syntax. " + "simport docs for more info"), + statistics_period=ConfigItem(help="Emit stats on event counts, etc every " + "this many seconds", default=10), + pipeline_worker_batch_size=ConfigItem(help="Number of streams for pipeline " + "worker(s) to load at a time", default=1000), + pipeline_worker_delay=ConfigItem(help="Number of seconds for pipeline worker to sleep " + "when it finds no streams to process", default=10), + database=ConfigSection(help="Database connection info.", + config_description=DBInterface.config_description()), + trigger_definitions=ConfigItem(required=True, + help="Name of trigger definitions file " + "defining trigger conditions and what events to " + "process for each stream"), + pipeline_config=ConfigItem(required=True, + help="Name of pipeline config file " + "defining the handlers for each pipeline."), + ) + + def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None): + logger.debug("PipelineManager: Using config: %s" % str(config)) + config = ConfigManager.wrap(config, self.config_description()) + self.config = config + config.check_config() + config.add_config_path(*config['config_path']) + if db is not None: + self.db = db + else: + self.db = DBInterface(config['database']) + + if pipeline_handlers is not None: + self.pipeline_handlers = pipeline_handlers + else: + self.pipeline_handlers = self._load_plugins(config['pipeline_handlers']) + logger.debug("Pipeline handlers: %s" % str(self.pipeline_handlers)) + + if pipeline_config is not None: + self.pipeline_config = pipeline_config + else: + self.pipeline_config = config.load_file(config['pipeline_config']) + + logger.debug("Pipeline config: %s" % str(self.pipeline_config)) + for pipeline, handler_configs in self.pipeline_config.items(): + self.pipeline_config[pipeline] = [Pipeline.check_handler_config(conf, + self.pipeline_handlers) + for conf in handler_configs] + + if trigger_defs is not None: + self.trigger_definitions = trigger_defs + else: + defs = config.load_file(config['trigger_definitions']) + logger.debug("Loaded trigger definitions %s" % str(defs)) + self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] + self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions) + + self.pipeline_worker_batch_size = config['pipeline_worker_batch_size'] + self.pipeline_worker_delay = config['pipeline_worker_delay'] + self.statistics_period = config['statistics_period'] + self.streams_fired = 0 + self.streams_expired = 0 + self.streams_loaded = 0 + self.last_status = self.current_time() + + @classmethod + def _load_plugins(cls, plug_map, defaults=None): + plugins = dict() + if defaults is not None: + plugins.update(defaults) + for name, cls_string in plug_map.items(): + try: + plugins[name] = simport.load(cls_string) + except simport.ImportFailed as e: + log.error("Could not load plugin %s: Import failed. %s" % ( + name, e)) + except (simport.MissingMethodOrFunction, + simport.MissingModule, + simport.BadDirectory) as e: + log.error("Could not load plugin %s: Not found. %s" % ( + name, e)) + return plugins + + def current_time(self): + # here so it's easily overridden. + return datetime.datetime.utcnow() + + def _log_statistics(self): + logger.info("Loaded %s streams. Fired %s, Expired %s." % ( + self.streams_loaded, self.streams_fired, self.streams_expired)) + self.streams_fired = 0 + self.streams_expired = 0 + self.streams_loaded = 0 + self.last_status = self.current_time() + + def add_new_events(self, events): + pass + + def _run_pipeline(self, stream, trigger_def, pipeline_name, pipeline_config): + events = self.db.get_stream_events(stream) + try: + pipeline = Pipeline(pipeline_name, pipeline_config, self.pipeline_handlers) + new_events = pipeline.handle_events(events) + except PipelineExecutionError: + logger.error("Exception in pipeline %s handling stream %s" % ( + pipeline_name, stream.id)) + return False + if new_events: + self.add_new_events(new_events) + return True + + def _complete_stream(self, stream): + self.db.set_stream_state(stream, StreamState.completed) + + def _error_stream(self, stream): + self.db.set_stream_state(stream, StreamState.error) + + def _expire_error_stream(self, stream): + self.db.set_stream_state(stream, StreamState.expire_error) + + def fire_stream(self, stream): + try: + stream = self.db.set_stream_state(stream, StreamState.firing) + except LockError: + logger.debug("Stream %s locked. Moving on..." % stream.id) + return False + trigger_def = self.trigger_map.get(stream.name) + if trigger_def is None: + logger.error("Stream %s has unknown trigger definition %s" % ( + stream.id, stream.name)) + self._error_stream(stream) + return False + pipeline = trigger_def.fire_pipeline + if pipeline is not None: + pipe_config = self.pipeline_config.get(pipeline) + if pipe_config is None: + logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( + stream.name, stream.id, pipeline)) + self._error_stream(stream) + if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + self._error_stream(stream) + return False + else: + logger.debug("No fire pipeline for stream %s. Nothing to do." % ( + stream.id)) + self._complete_stream(stream) + self.streams_fired +=1 + return True + + def expire_stream(self, stream): + try: + stream = self.db.set_stream_state(stream, StreamState.expiring) + except LockError: + logger.debug("Stream %s locked. Moving on..." % stream.id) + return False + trigger_def = self.trigger_map.get(stream.name) + if trigger_def is None: + logger.error("Stream %s has unknown trigger definition %s" % ( + stream.id, stream.name)) + self._expire_error_stream(stream) + return False + pipeline = trigger_def.expire_pipeline + if pipeline is not None: + pipe_config = self.pipeline_config.get(pipeline) + if pipe_config is None: + logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( + stream.name, stream.id, pipeline)) + self._expire_error_stream(stream) + if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + self._expire_error_stream(stream) + return False + else: + logger.debug("No expire pipeline for stream %s. Nothing to do." % ( + stream.id)) + self._complete_stream(stream) + self.streams_expired +=1 + return True + + def process_ready_streams(self, batch_size, expire=False): + streams = self.db.get_ready_streams(batch_size, self.current_time(), + expire=expire) + stream_ct = len(streams) + if expire: + logger.debug("Loaded %s streams to expire." % stream_ct) + else: + logger.debug("Loaded %s streams to fire." % stream_ct) + + random.shuffle(streams) + for stream in streams: + if expire: + self.expire_stream(stream) + else: + self.fire_stream(stream) + self.streams_loaded += stream_ct + return stream_ct + + def run(self): + while True: + fire_ct = self.process_ready_streams(self.pipeline_worker_batch_size) + expire_ct = self.process_ready_streams(self.pipeline_worker_batch_size, + expire=True) + + if (self.current_time() - self.last_status).seconds > self.statistics_period: + self._log_statistics() + + if not fire_ct and not expire_ct: + logger.debug("No streams to fire or expire. Sleeping...") + time.sleep(self.pipeline_worker_delay) diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index ebdbe16..6ec4978 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -1,8 +1,202 @@ -from winchester.db import DBInterface +import datetime +import logging +from stackdistiller import distiller, condenser +import simport + +from winchester.db import DBInterface, DuplicateError +from winchester.config import ConfigManager, ConfigSection, ConfigItem +from winchester.definition import TriggerDefinition + + +logger = logging.getLogger(__name__) + + +class EventCondenser(condenser.CondenserBase): + + def __init__(self, dbi): + self.dbi = dbi + self.clear() + + def clear(self): + self.traits = dict() + self.event_type = None + self.message_id = None + self.timestamp = None + + def add_trait(self, name, trait_type, value): + self.traits[name] = value + + def add_envelope_info(self, event_type, message_id, when): + self.event_type = event_type + self.message_id = message_id + self.timestamp = when + + def get_event(self): + event = self.traits.copy() + event['message_id'] = self.message_id + event['timestamp'] = self.timestamp + event['event_type'] = self.event_type + return event + + def validate(self): + if self.event_type is None: + return False + if self.message_id is None: + return False + if self.timestamp is None: + return False + if not self.traits: + return False + return True class TriggerManager(object): - def __init__(self, config): + @classmethod + def config_description(cls): + return dict(config_path=ConfigItem(help="Path(s) to find additional config files", + multiple=True, default='.'), + distiller_config=ConfigItem(required=True, + help="Name of distiller config file " + "describing what to extract from the " + "notifications"), + distiller_trait_plugins=ConfigItem(help="dictionary of trait plugins to load " + "for stackdistiller. Classes specified with " + "simport syntax. See stackdistiller and " + "simport docs for more info", default=dict()), + catch_all_notifications=ConfigItem(help="Store basic info for all notifications," + " even if not listed in distiller config", + default=False), + statistics_period=ConfigItem(help="Emit stats on event counts, etc every " + "this many seconds", default=10), + database=ConfigSection(help="Database connection info.", + config_description=DBInterface.config_description()), + trigger_definitions=ConfigItem(required=True, + help="Name of trigger definitions file " + "defining trigger conditions and what events to " + "process for each stream"), + ) + + def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None): + config = ConfigManager.wrap(config, self.config_description()) self.config = config - self.db = DBInterface(config['database']) + config.check_config() + config.add_config_path(*config['config_path']) + + if db is not None: + self.db = db + else: + self.db = DBInterface(config['database']) + if stackdistiller is not None: + self.distiller = stackdistiller + else: + dist_config = config.load_file(config['distiller_config']) + plugmap = self._load_plugins(config['distiller_trait_plugins'], + distiller.DEFAULT_PLUGINMAP) + self.distiller = distiller.Distiller(dist_config, + trait_plugin_map=plugmap, + catchall=config['catch_all_notifications']) + if trigger_defs is not None: + self.trigger_definitions = trigger_defs + else: + defs = config.load_file(config['trigger_definitions']) + self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] + self.statistics_period = config['statistics_period'] + self.saved_events = 0 + self.received = 0 + self.last_status = self.current_time() + + @classmethod + def _load_plugins(cls, plug_map, defaults=None): + plugins = dict() + if defaults is not None: + plugins.update(defaults) + for name, cls_string in plug_map.items(): + try: + plugins[name] = simport.load(cls_string) + except simport.ImportFailed as e: + log.error("Could not load plugin %s: Import failed. %s" % ( + name, e)) + except (simport.MissingMethodOrFunction, + simport.MissingModule, + simport.BadDirectory) as e: + log.error("Could not load plugin %s: Not found. %s" % ( + name, e)) + return plugins + + def current_time(self): + # here so it's easily overridden. + return datetime.datetime.utcnow() + + def save_event(self, event): + traits = event.copy() + message_id = traits.pop('message_id') + timestamp = traits.pop('timestamp') + event_type = traits.pop('event_type') + try: + self.db.create_event(message_id, event_type, + timestamp, traits) + self.saved_events += 1 + return True + except DuplicateError: + logger.info("Received duplicate event %s, Ignoring." % message_id) + return False + + def convert_notification(self, notification_body): + cond = EventCondenser(self.db) + cond.clear() + self.received += 1 + if self.distiller.to_event(notification_body, cond): + if cond.validate(): + event = cond.get_event() + if self.save_event(event): + return event + else: + logger.warning("Received invalid event") + else: + event_type = notification_body.get('event_type', '**no event_type**') + message_id = notification_body.get('message_id', '**no id**') + logger.info("Dropping unconverted %s notification %s" % (event_type, message_id)) + return None + + def _log_statistics(self): + logger.info("Received %s notifications. Saved %s events." % ( + self.received, self.saved_events)) + self.received = 0 + self.saved_events = 0 + self.last_status = self.current_time() + + def _add_or_create_stream(self, trigger_def, event, dist_traits): + stream = self.db.get_active_stream(trigger_def.name, dist_traits, self.current_time()) + if stream is None: + stream = self.db.create_stream(trigger_def.name, event, dist_traits, + trigger_def.expiration) + logger.debug("Created New stream %s for %s: distinguished by %s" % ( + stream.id, trigger_def.name, str(dist_traits))) + else: + self.db.add_event_stream(stream, event, trigger_def.expiration) + return stream + + def _ready_to_fire(self, stream, trigger_def): + timestamp = trigger_def.get_fire_timestamp(self.current_time()) + self.db.stream_ready_to_fire(stream, timestamp) + logger.debug("Stream %s ready to fire at %s" % ( + stream.id, timestamp)) + + def add_event(self, event): + for trigger_def in self.trigger_definitions: + matched_criteria = trigger_def.match(event) + if matched_criteria: + dist_traits = trigger_def.get_distinguishing_traits(event, matched_criteria) + stream = self._add_or_create_stream(trigger_def, event, dist_traits) + if stream.fire_timestamp is None: + if trigger_def.should_fire(self.db.get_stream_events(stream)): + self._ready_to_fire(stream, trigger_def) + if (self.current_time() - self.last_status).seconds > self.statistics_period: + self._log_statistics() + + def add_notification(self, notification_body): + event = self.convert_notification(notification_body) + if event: + self.add_event(event) + diff --git a/winchester/worker.py b/winchester/worker.py new file mode 100644 index 0000000..499f362 --- /dev/null +++ b/winchester/worker.py @@ -0,0 +1,40 @@ +import argparse +import daemon +import logging +from logging.config import fileConfig + + +logger = logging.getLogger(__name__) + + +from winchester.config import ConfigManager +from winchester.pipeline_manager import PipelineManager + + +def main(): + parser = argparse.ArgumentParser(description="Winchester pipeline worker") + parser.add_argument('--config', '-c', default='winchester.yaml', + help='The name of the winchester config file') + parser.add_argument('--daemon', '-d', help='Run in daemon mode.') + args = parser.parse_args() + conf = ConfigManager.load_config_file(args.config) + + if 'logging_config' in conf: + fileConfig(conf['logging_config']) + else: + logging.basicConfig() + if 'log_level' in conf: + level = conf['log_level'] + level = getattr(logging, level.upper()) + logging.getLogger('winchester').setLevel(level) + pipe = PipelineManager(conf) + if args.daemon: + print "Backgrounding for daemon mode." + with daemon.DaemonContext(): + pipe.run() + else: + pipe.run() + + +if __name__ == '__main__': + main()