Add max_count argument for clear_expired_data call

When thare're a lot of events in the database panko-expire
consumes a lot of memory to delete all records with a single
call. To avoid this behaviour a new config option
`events_delete_batch_size` was introduced.

Change-Id: Icf83ffe089301b3782273923f18efd4d209131c2
(cherry picked from commit ec84ed8aa7)
This commit is contained in:
Ivan Kolodyazhny 2020-06-03 14:11:09 +03:00 committed by Martin Mágr
parent 4e8597a149
commit 0a8d5b4fc0
10 changed files with 58 additions and 18 deletions

View File

@ -34,7 +34,20 @@ def expirer():
if conf.database.event_time_to_live > 0:
LOG.debug("Clearing expired event data")
conn = storage.get_connection_from_config(conf)
conn.clear_expired_data(conf.database.event_time_to_live)
max_count = conf.database.events_delete_batch_size
try:
if max_count > 0:
conn.clear_expired_data(conf.database.event_time_to_live,
max_count)
else:
deleted = max_count = 100
while deleted and deleted > 0:
deleted = conn.clear_expired_data(
conf.database.event_time_to_live,
max_count)
except TypeError:
LOG.warning("Storage driver does not support "
"'events_delete_batch_size' config option.")
else:
LOG.info("Nothing to clean, database event time to live "
"is disabled")

View File

@ -32,6 +32,11 @@ OPTS = [
default=-1,
help=("Number of seconds that events are kept "
"in the database for (<= 0 means forever).")),
cfg.IntOpt('events_delete_batch_size',
default=0,
min=0,
help=("Number of events to be deleted in one iteration "
"from the database for (0 means all).")),
cfg.StrOpt('event_connection',
secret=True,
deprecated_for_removal=True,

View File

@ -122,10 +122,11 @@ class Connection(object):
return cls.STORAGE_CAPABILITIES
@staticmethod
def clear_expired_data(ttl):
def clear_expired_data(ttl, max_count=None):
"""Clear expired data from the backend storage system.
Clearing occurs according to the time-to-live.
:param ttl: Number of seconds to keep records for.
:param max_count: Number of records to delete.
"""
raise panko.NotImplementedError('Clearing events not implemented')

View File

@ -22,11 +22,12 @@ class Connection(base.Connection):
"""Log event data."""
@staticmethod
def clear_expired_data(ttl):
def clear_expired_data(ttl, max_count):
"""Clear expired data from the backend storage system.
Clearing occurs according to the time-to-live.
:param ttl: Number of seconds to keep records for.
:param max_count: Number of records to delete.
"""
LOG.info("Dropping event data with TTL %d", ttl)
LOG.info("Dropping %d events data with TTL %d", max_count, ttl)

View File

@ -96,12 +96,13 @@ class Connection(pymongo_base.Connection):
# Connection will be reopened automatically if needed
self.conn.close()
def clear_expired_data(self, ttl):
def clear_expired_data(self, ttl, max_count=None):
"""Clear expired data from the backend storage system.
Clearing occurs according to the time-to-live.
:param ttl: Number of seconds to keep records for.
:param max_count: Number of records to delete (not used for MongoDB).
"""
self.update_ttl(ttl, 'event_ttl', 'timestamp', self.db.event)
LOG.info("Clearing expired event data is based on native "

View File

@ -462,12 +462,13 @@ class Connection(base.Connection):
dtype=dtype,
value=v)
def clear_expired_data(self, ttl):
def clear_expired_data(self, ttl, max_count):
"""Clear expired data from the backend storage system.
Clearing occurs according to the time-to-live.
:param ttl: Number of seconds to keep records for.
:param max_count: Number of records to delete.
"""
session = self._engine_facade.get_session()
with session.begin():
@ -475,17 +476,23 @@ class Connection(base.Connection):
event_q = (session.query(models.Event.id)
.filter(models.Event.generated < end))
event_subq = event_q.subquery()
# NOTE(e0ne): it's not an optiomal from the performance point of
# view but it works with all databases.
ids = [i[0] for i in event_q.limit(max_count)]
for trait_model in [models.TraitText, models.TraitInt,
models.TraitFloat, models.TraitDatetime]:
(session.query(trait_model)
.filter(trait_model.event_id.in_(event_subq))
.delete(synchronize_session="fetch"))
event_rows = event_q.delete()
session.query(trait_model).filter(
trait_model.event_id.in_(ids)
).delete(synchronize_session="fetch")
event_rows = session.query(models.Event).filter(
models.Event.id.in_(ids)
).delete(synchronize_session="fetch")
# remove EventType and TraitType with no corresponding
# matching events and traits
# matching events
(session.query(models.EventType)
.filter(~models.EventType.events.any())
.delete(synchronize_session="fetch"))
LOG.info("%d events are removed from database", event_rows)
return event_rows

View File

@ -31,22 +31,22 @@ class IndexTest(tests_db.TestBase):
def test_event_ttl_index_absent(self):
# create a fake index and check it is deleted
self.conn.clear_expired_data(-1)
self.conn.clear_expired_data(-1, 0)
self.assertNotIn("event_ttl",
self.conn.db.event.index_information())
self.conn.clear_expired_data(456789)
self.conn.clear_expired_data(456789, 0)
self.assertEqual(456789,
self.conn.db.event.index_information()
["event_ttl"]['expireAfterSeconds'])
def test_event_ttl_index_present(self):
self.conn.clear_expired_data(456789)
self.conn.clear_expired_data(456789, 0)
self.assertEqual(456789,
self.conn.db.event.index_information()
["event_ttl"]['expireAfterSeconds'])
self.conn.clear_expired_data(-1)
self.conn.clear_expired_data(-1, 0)
self.assertNotIn("event_ttl",
self.conn.db.event.index_information())

View File

@ -68,7 +68,7 @@ class EventTTLTest(EventTestBase):
@mock.patch.object(timeutils, 'utcnow')
def test_clear_expired_data(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime(2013, 12, 31, 10, 0)
self.conn.clear_expired_data(3600)
self.conn.clear_expired_data(3600, 100)
events = list(self.conn.get_events(storage.EventFilter()))
self.assertEqual(2, len(events))

View File

@ -66,7 +66,7 @@ class BinTestCase(base.BaseTestCase):
stdout=subprocess.PIPE)
out, __ = subp.communicate()
self.assertEqual(0, subp.poll())
msg = "Dropping %s data with TTL 1" % data_name
msg = "Dropping 100 %ss data with TTL 1" % data_name
if six.PY3:
msg = msg.encode('utf-8')
self.assertIn(msg, out)

View File

@ -0,0 +1,12 @@
---
features:
- |
A new ``events_delete_batch_size`` config option is introduced to specify
a number of events to be deleted in one iteration from the database. It will
help when thare're a lot of events in the database and panko-expire
consumes a lot of memory to delete all records with a single
call.
fixes:
|
Fixed the issue that panko-expire is consuming too much memory during
events cleaning up.