diff --git a/setup.cfg b/setup.cfg index 3d023e8..2ba939e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = winchester -version = 0.56 +version = 0.57 author = Monsyne Dragon author_email = mdragon@rackspace.com summary = An OpenStack notification event processing library. diff --git a/tests/test_db.py b/tests/test_db.py index 2864fdc..7e455a5 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -531,6 +531,21 @@ class TestDB(unittest.TestCase): self.assertEqual(streams[0]['id'], 3) self.assertEqual(streams[1]['id'], 4) + def test_purge_events(self): + self.db.purge_events([1]) + events = self.db.find_events() + self.assertEqual(3, len(events)) + + def test_find_older_events(self): + d1 = datetime.datetime(2014, 8, 1, 2, 10, 12, 1) + d2 = datetime.datetime(2014, 8, 1, 4, 57, 55, 43) + event_ids = self.db.find_older_events(d1, 2) + self.assertEqual(event_ids, [3]) + event_ids = self.db.find_older_events(d2, 2) + self.assertEqual(event_ids, [3, 4]) + event_ids = self.db.find_older_events(d2, 1) + self.assertEqual(event_ids, [3]) + def test_find_events(self): events = self.db.find_events() self.assertEqual(4, len(events)) diff --git a/winchester/db/interface.py b/winchester/db/interface.py index 8393f2a..1f3064b 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -24,6 +24,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import OperationalError from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import sessionmaker +from sqlalchemy.sql import select from winchester.config import ConfigItem from winchester.config import ConfigManager from winchester import models @@ -155,6 +156,33 @@ class DBInterface(object): "No event found with message_id %s!" % message_id) return e.as_dict + @sessioned + def find_older_events(self, purge_date, batchsize, session=None): + # For speed, we do this below the ORM layer. (mdragon) + conn = session.connection() + event_table = models.Event.__table__ + q = select([event_table.c.id]) + q = q.where(event_table.c.generated < purge_date) + q = q.order_by(event_table.c.generated.asc()) + q = q.limit(batchsize) + return [r[0] for r in conn.execute(q).fetchall()] + + @sessioned + def purge_events(self, event_ids, session=None): + # For speed, we do this below the ORM layer. (mdragon) + conn = session.connection() + dq = models.stream_event_table.delete() + dq = dq.where(models.stream_event_table.c.event_id.in_(event_ids)) + conn.execute(dq) + trait_table = models.Trait.__table__ + dq = trait_table.delete() + dq = dq.where(trait_table.c.event_id.in_(event_ids)) + conn.execute(dq) + event_table = models.Event.__table__ + dq = event_table.delete() + dq = dq.where(event_table.c.id.in_(event_ids)) + conn.execute(dq) + @sessioned def find_events(self, from_datetime=None, to_datetime=None, event_name=None, traits=None, mark=None, limit=None, diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index cba0a07..1bccbc1 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -19,6 +19,7 @@ import random import simport import six import time +import timex from winchester.config import ConfigItem from winchester.config import ConfigManager @@ -145,6 +146,16 @@ class PipelineManager(object): help="Delete successfully proccessed " "streams when finished?", default=True), + trim_events=ConfigItem( + help="Delete events older than a configurable time.", + default=False), + trim_events_age=ConfigItem( + help="Delete events older than this (timex expr).", + default="$timestamp - 14d"), + trim_events_batch_size=ConfigItem( + help="Maximum number of events for pipeline " + "worker(s) to trim at a time", + default=100), )) return configs @@ -208,6 +219,15 @@ class PipelineManager(object): self.pipeline_worker_delay = config['pipeline_worker_delay'] self.statistics_period = config['statistics_period'] self.purge_completed_streams = config['purge_completed_streams'] + self.trim_events = config['trim_events'] + self.trim_events_batch_size = config['trim_events_batch_size'] + try: + self.trim_events_age = timex.parse(str(config['trim_events_age'])) + except timex.TimexError: + logger.error("Invalid trim event expression: %s Event trimming " + "disabled." % config['trim_events_age']) + self.trim_events_age = None + self.trim_events = False self.streams_fired = 0 self.streams_expired = 0 self.streams_loaded = 0 @@ -395,6 +415,14 @@ class PipelineManager(object): self.streams_loaded += stream_ct return stream_ct + def process_trim_events(self): + trim_date = self.trim_events_age().timestamp + event_ids = self.db.find_older_events(trim_date, + self.trim_events_batch_size) + logger.debug("Trimming %s old events" % len(event_ids)) + self.db.purge_events(event_ids) + return len(event_ids) + def run(self): while True: try: @@ -404,11 +432,15 @@ class PipelineManager(object): self.pipeline_worker_batch_size, expire=True) + trim_ct = 0 + if self.trim_events: + trim_ct = self.process_trim_events() + if ((self.current_time() - self.last_status).seconds > self.statistics_period): self._log_statistics() - if not fire_ct and not expire_ct: + if not fire_ct and not expire_ct and not trim_ct: logger.debug("No streams to fire or expire. Sleeping...") time.sleep(self.pipeline_worker_delay) except DatabaseConnectionError: