Add max_count argument for clear_expired_data call
When thare're a lot of events in the database panko-expire
consumes a lot of memory to delete all records with a single
call. To avoid this behaviour a new config option
`events_delete_batch_size` was introduced.
Change-Id: Icf83ffe089301b3782273923f18efd4d209131c2
(cherry picked from commit ec84ed8aa7
)
This commit is contained in:
parent
4e8597a149
commit
0a8d5b4fc0
|
@ -34,7 +34,20 @@ def expirer():
|
|||
if conf.database.event_time_to_live > 0:
|
||||
LOG.debug("Clearing expired event data")
|
||||
conn = storage.get_connection_from_config(conf)
|
||||
conn.clear_expired_data(conf.database.event_time_to_live)
|
||||
max_count = conf.database.events_delete_batch_size
|
||||
try:
|
||||
if max_count > 0:
|
||||
conn.clear_expired_data(conf.database.event_time_to_live,
|
||||
max_count)
|
||||
else:
|
||||
deleted = max_count = 100
|
||||
while deleted and deleted > 0:
|
||||
deleted = conn.clear_expired_data(
|
||||
conf.database.event_time_to_live,
|
||||
max_count)
|
||||
except TypeError:
|
||||
LOG.warning("Storage driver does not support "
|
||||
"'events_delete_batch_size' config option.")
|
||||
else:
|
||||
LOG.info("Nothing to clean, database event time to live "
|
||||
"is disabled")
|
||||
|
|
|
@ -32,6 +32,11 @@ OPTS = [
|
|||
default=-1,
|
||||
help=("Number of seconds that events are kept "
|
||||
"in the database for (<= 0 means forever).")),
|
||||
cfg.IntOpt('events_delete_batch_size',
|
||||
default=0,
|
||||
min=0,
|
||||
help=("Number of events to be deleted in one iteration "
|
||||
"from the database for (0 means all).")),
|
||||
cfg.StrOpt('event_connection',
|
||||
secret=True,
|
||||
deprecated_for_removal=True,
|
||||
|
|
|
@ -122,10 +122,11 @@ class Connection(object):
|
|||
return cls.STORAGE_CAPABILITIES
|
||||
|
||||
@staticmethod
|
||||
def clear_expired_data(ttl):
|
||||
def clear_expired_data(ttl, max_count=None):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
:param max_count: Number of records to delete.
|
||||
"""
|
||||
raise panko.NotImplementedError('Clearing events not implemented')
|
||||
|
|
|
@ -22,11 +22,12 @@ class Connection(base.Connection):
|
|||
"""Log event data."""
|
||||
|
||||
@staticmethod
|
||||
def clear_expired_data(ttl):
|
||||
def clear_expired_data(ttl, max_count):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
:param max_count: Number of records to delete.
|
||||
"""
|
||||
LOG.info("Dropping event data with TTL %d", ttl)
|
||||
LOG.info("Dropping %d events data with TTL %d", max_count, ttl)
|
||||
|
|
|
@ -96,12 +96,13 @@ class Connection(pymongo_base.Connection):
|
|||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
||||
def clear_expired_data(self, ttl):
|
||||
def clear_expired_data(self, ttl, max_count=None):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
:param max_count: Number of records to delete (not used for MongoDB).
|
||||
"""
|
||||
self.update_ttl(ttl, 'event_ttl', 'timestamp', self.db.event)
|
||||
LOG.info("Clearing expired event data is based on native "
|
||||
|
|
|
@ -462,12 +462,13 @@ class Connection(base.Connection):
|
|||
dtype=dtype,
|
||||
value=v)
|
||||
|
||||
def clear_expired_data(self, ttl):
|
||||
def clear_expired_data(self, ttl, max_count):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
:param max_count: Number of records to delete.
|
||||
"""
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
|
@ -475,17 +476,23 @@ class Connection(base.Connection):
|
|||
event_q = (session.query(models.Event.id)
|
||||
.filter(models.Event.generated < end))
|
||||
|
||||
event_subq = event_q.subquery()
|
||||
# NOTE(e0ne): it's not an optiomal from the performance point of
|
||||
# view but it works with all databases.
|
||||
ids = [i[0] for i in event_q.limit(max_count)]
|
||||
for trait_model in [models.TraitText, models.TraitInt,
|
||||
models.TraitFloat, models.TraitDatetime]:
|
||||
(session.query(trait_model)
|
||||
.filter(trait_model.event_id.in_(event_subq))
|
||||
.delete(synchronize_session="fetch"))
|
||||
event_rows = event_q.delete()
|
||||
session.query(trait_model).filter(
|
||||
trait_model.event_id.in_(ids)
|
||||
).delete(synchronize_session="fetch")
|
||||
event_rows = session.query(models.Event).filter(
|
||||
models.Event.id.in_(ids)
|
||||
).delete(synchronize_session="fetch")
|
||||
|
||||
# remove EventType and TraitType with no corresponding
|
||||
# matching events and traits
|
||||
# matching events
|
||||
(session.query(models.EventType)
|
||||
.filter(~models.EventType.events.any())
|
||||
.delete(synchronize_session="fetch"))
|
||||
LOG.info("%d events are removed from database", event_rows)
|
||||
|
||||
return event_rows
|
||||
|
|
|
@ -31,22 +31,22 @@ class IndexTest(tests_db.TestBase):
|
|||
|
||||
def test_event_ttl_index_absent(self):
|
||||
# create a fake index and check it is deleted
|
||||
self.conn.clear_expired_data(-1)
|
||||
self.conn.clear_expired_data(-1, 0)
|
||||
self.assertNotIn("event_ttl",
|
||||
self.conn.db.event.index_information())
|
||||
|
||||
self.conn.clear_expired_data(456789)
|
||||
self.conn.clear_expired_data(456789, 0)
|
||||
self.assertEqual(456789,
|
||||
self.conn.db.event.index_information()
|
||||
["event_ttl"]['expireAfterSeconds'])
|
||||
|
||||
def test_event_ttl_index_present(self):
|
||||
self.conn.clear_expired_data(456789)
|
||||
self.conn.clear_expired_data(456789, 0)
|
||||
self.assertEqual(456789,
|
||||
self.conn.db.event.index_information()
|
||||
["event_ttl"]['expireAfterSeconds'])
|
||||
|
||||
self.conn.clear_expired_data(-1)
|
||||
self.conn.clear_expired_data(-1, 0)
|
||||
self.assertNotIn("event_ttl",
|
||||
self.conn.db.event.index_information())
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ class EventTTLTest(EventTestBase):
|
|||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_clear_expired_data(self, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2013, 12, 31, 10, 0)
|
||||
self.conn.clear_expired_data(3600)
|
||||
self.conn.clear_expired_data(3600, 100)
|
||||
|
||||
events = list(self.conn.get_events(storage.EventFilter()))
|
||||
self.assertEqual(2, len(events))
|
||||
|
|
|
@ -66,7 +66,7 @@ class BinTestCase(base.BaseTestCase):
|
|||
stdout=subprocess.PIPE)
|
||||
out, __ = subp.communicate()
|
||||
self.assertEqual(0, subp.poll())
|
||||
msg = "Dropping %s data with TTL 1" % data_name
|
||||
msg = "Dropping 100 %ss data with TTL 1" % data_name
|
||||
if six.PY3:
|
||||
msg = msg.encode('utf-8')
|
||||
self.assertIn(msg, out)
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
A new ``events_delete_batch_size`` config option is introduced to specify
|
||||
a number of events to be deleted in one iteration from the database. It will
|
||||
help when thare're a lot of events in the database and panko-expire
|
||||
consumes a lot of memory to delete all records with a single
|
||||
call.
|
||||
fixes:
|
||||
|
|
||||
Fixed the issue that panko-expire is consuming too much memory during
|
||||
events cleaning up.
|
Loading…
Reference in New Issue