Add optional database trimming for old events.

Alows you to trim events older than a configurable time from
the events database.

(Remerging to fix timex parse error)

Change-Id: Iaa290705815d1c3ac23c2ca7370a5d705f1f834c
This commit is contained in:
Monsyne Dragon 2015-08-17 17:47:35 +00:00
parent a15b95dbf3
commit 665c9ad328
4 changed files with 77 additions and 2 deletions

View File

@ -1,7 +1,7 @@
[metadata]
description-file = README.md
name = winchester
version = 0.56
version = 0.57
author = Monsyne Dragon
author_email = mdragon@rackspace.com
summary = An OpenStack notification event processing library.

View File

@ -531,6 +531,21 @@ class TestDB(unittest.TestCase):
self.assertEqual(streams[0]['id'], 3)
self.assertEqual(streams[1]['id'], 4)
def test_purge_events(self):
self.db.purge_events([1])
events = self.db.find_events()
self.assertEqual(3, len(events))
def test_find_older_events(self):
d1 = datetime.datetime(2014, 8, 1, 2, 10, 12, 1)
d2 = datetime.datetime(2014, 8, 1, 4, 57, 55, 43)
event_ids = self.db.find_older_events(d1, 2)
self.assertEqual(event_ids, [3])
event_ids = self.db.find_older_events(d2, 2)
self.assertEqual(event_ids, [3, 4])
event_ids = self.db.find_older_events(d2, 1)
self.assertEqual(event_ids, [3])
def test_find_events(self):
events = self.db.find_events()
self.assertEqual(4, len(events))

View File

@ -24,6 +24,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import select
from winchester.config import ConfigItem
from winchester.config import ConfigManager
from winchester import models
@ -155,6 +156,33 @@ class DBInterface(object):
"No event found with message_id %s!" % message_id)
return e.as_dict
@sessioned
def find_older_events(self, purge_date, batchsize, session=None):
# For speed, we do this below the ORM layer. (mdragon)
conn = session.connection()
event_table = models.Event.__table__
q = select([event_table.c.id])
q = q.where(event_table.c.generated < purge_date)
q = q.order_by(event_table.c.generated.asc())
q = q.limit(batchsize)
return [r[0] for r in conn.execute(q).fetchall()]
@sessioned
def purge_events(self, event_ids, session=None):
# For speed, we do this below the ORM layer. (mdragon)
conn = session.connection()
dq = models.stream_event_table.delete()
dq = dq.where(models.stream_event_table.c.event_id.in_(event_ids))
conn.execute(dq)
trait_table = models.Trait.__table__
dq = trait_table.delete()
dq = dq.where(trait_table.c.event_id.in_(event_ids))
conn.execute(dq)
event_table = models.Event.__table__
dq = event_table.delete()
dq = dq.where(event_table.c.id.in_(event_ids))
conn.execute(dq)
@sessioned
def find_events(self, from_datetime=None, to_datetime=None,
event_name=None, traits=None, mark=None, limit=None,

View File

@ -19,6 +19,7 @@ import random
import simport
import six
import time
import timex
from winchester.config import ConfigItem
from winchester.config import ConfigManager
@ -145,6 +146,16 @@ class PipelineManager(object):
help="Delete successfully proccessed "
"streams when finished?",
default=True),
trim_events=ConfigItem(
help="Delete events older than a configurable time.",
default=False),
trim_events_age=ConfigItem(
help="Delete events older than this (timex expr).",
default="$timestamp - 14d"),
trim_events_batch_size=ConfigItem(
help="Maximum number of events for pipeline "
"worker(s) to trim at a time",
default=100),
))
return configs
@ -208,6 +219,15 @@ class PipelineManager(object):
self.pipeline_worker_delay = config['pipeline_worker_delay']
self.statistics_period = config['statistics_period']
self.purge_completed_streams = config['purge_completed_streams']
self.trim_events = config['trim_events']
self.trim_events_batch_size = config['trim_events_batch_size']
try:
self.trim_events_age = timex.parse(str(config['trim_events_age']))
except timex.TimexError:
logger.error("Invalid trim event expression: %s Event trimming "
"disabled." % config['trim_events_age'])
self.trim_events_age = None
self.trim_events = False
self.streams_fired = 0
self.streams_expired = 0
self.streams_loaded = 0
@ -395,6 +415,14 @@ class PipelineManager(object):
self.streams_loaded += stream_ct
return stream_ct
def process_trim_events(self):
trim_date = self.trim_events_age().timestamp
event_ids = self.db.find_older_events(trim_date,
self.trim_events_batch_size)
logger.debug("Trimming %s old events" % len(event_ids))
self.db.purge_events(event_ids)
return len(event_ids)
def run(self):
while True:
try:
@ -404,11 +432,15 @@ class PipelineManager(object):
self.pipeline_worker_batch_size,
expire=True)
trim_ct = 0
if self.trim_events:
trim_ct = self.process_trim_events()
if ((self.current_time() - self.last_status).seconds
> self.statistics_period):
self._log_statistics()
if not fire_ct and not expire_ct:
if not fire_ct and not expire_ct and not trim_ct:
logger.debug("No streams to fire or expire. Sleeping...")
time.sleep(self.pipeline_worker_delay)
except DatabaseConnectionError: