Add db purge support

Added db purge support to delete the soft-deleted records from tables.
In case of tables 'hosts' and 'failover_segments' the decision to purge
the record is made on the basis of 'deleted' and 'deleted_at' column's
value. However in case of 'notifications' table it is decided on the
basis of 'updated_at' column's value as notifications don't have
'delete' API.

Operator can use the below command to purge the records:

"masakari-manage db purge --age_in_days <days> --max_rows <rows>"

Implements: blueprint db-purge-support

Co-author: Pooja Jadhav <pooja.jadhav@nttdata.com>
Change-Id: Id959b0e2b84d5d2587d6b70b8f695f69634a1d17
This commit is contained in:
pooja jadhav 2017-07-10 18:19:07 +05:30 committed by Pooja Jadhav
parent b2676ae2e6
commit 4048b1fd8e
7 changed files with 395 additions and 4 deletions

View File

@ -21,16 +21,21 @@
import logging as python_logging
import sys
import time
from oslo_config import cfg
from oslo_db.sqlalchemy import migration
from oslo_log import log as logging
import six
import masakari.conf
from masakari import context
from masakari import db
from masakari.db import api as db_api
from masakari.db.sqlalchemy import migration as db_migration
from masakari import exception
from masakari.i18n import _
from masakari import utils
from masakari import version
@ -76,6 +81,35 @@ class DbCommands(object):
db_migration.MIGRATE_REPO_PATH,
db_migration.INIT_VERSION))
@args('--age_in_days', type=int, default=30,
help='Purge deleted rows older than age in days (default: '
'%(default)d)')
@args('--max_rows', type=int, default=-1,
help='Limit number of records to delete (default: %(default)d)')
def purge(self, age_in_days, max_rows):
"""Purge rows older than a given age from masakari tables."""
try:
max_rows = utils.validate_integer(
max_rows, 'max_rows', -1, db.MAX_INT)
except exception.Invalid as exc:
sys.exit(six.text_type(exc))
try:
age_in_days = int(age_in_days)
except ValueError:
msg = 'Invalid value for age, %(age)s' % {'age': age_in_days}
sys.exit(six.text_type(msg))
if max_rows == 0:
sys.exit(_("Must supply value greater than 0 for max_rows."))
if age_in_days < 0:
sys.exit(_("Must supply a non-negative value for age."))
if age_in_days >= (int(time.time()) / 86400):
sys.exit(_("Maximal age is count of days since epoch."))
ctx = context.get_admin_context()
db_api.purge_deleted_rows(ctx, age_in_days, max_rows)
CATEGORIES = {
'db': DbCommands,
@ -115,10 +149,10 @@ def add_command_parsers(subparsers):
parser.set_defaults(action_kwargs=action_kwargs)
CONF.register_cli_opt(cfg.SubCommandOpt('category',
title='Command categories',
help='Available categories',
handler=add_command_parsers))
command_opt = cfg.SubCommandOpt('category',
title='Command categories',
help='Available categories',
handler=add_command_parsers)
def get_arg_string(args):
@ -149,6 +183,7 @@ def fetch_func_args(func):
def main():
"""Parse options and call the appropriate class/method."""
CONF.register_cli_opt(command_opt)
script_name = sys.argv[0]
if len(sys.argv) < 2:
print(_("\nOpenStack masakari version: %(version)s\n") %

View File

@ -361,3 +361,13 @@ def notification_delete(context, notification_uuid):
'notification_uuid' doesn't exist
"""
return IMPL.notification_delete(context, notification_uuid)
def purge_deleted_rows(context, age_in_days, max_rows):
"""Purge the soft deleted rows.
:param context: context to query under
:param age_in_days: Purge deleted rows older than age in days
:param max_rows: Limit number of records to delete
"""
return IMPL.purge_deleted_rows(context, age_in_days, max_rows)

View File

@ -14,14 +14,21 @@
# under the License.
"""Implementation of SQLAlchemy backend."""
import datetime
import sys
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy import or_, and_
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import MetaData
from sqlalchemy.orm import joinedload
from sqlalchemy import sql
import sqlalchemy.sql as sa_sql
from sqlalchemy.sql import func
import masakari.conf
@ -29,6 +36,7 @@ from masakari.db.sqlalchemy import models
from masakari import exception
from masakari.i18n import _
LOG = logging.getLogger(__name__)
CONF = masakari.conf.CONF
@ -617,3 +625,76 @@ def notification_delete(context, notification_uuid):
if count == 0:
raise exception.NotificationNotFound(id=notification_uuid)
class DeleteFromSelect(sa_sql.expression.UpdateBase):
def __init__(self, table, select, column):
self.table = table
self.select = select
self.column = column
# NOTE(pooja_jadhav): MySQL doesn't yet support subquery with
# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select.
@compiles(DeleteFromSelect)
def visit_delete_from_select(element, compiler, **kw):
return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.column),
element.column.name,
compiler.process(element.select))
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@main_context_manager.writer
def purge_deleted_rows(context, age_in_days, max_rows):
"""Purges soft deleted rows
Deleted rows get purged from hosts and segment tables based on
deleted_at column. As notifications table doesn't delete any of
the notification records so rows get purged from notifications
based on last updated_at and status column.
"""
engine = get_engine()
conn = engine.connect()
metadata = MetaData()
metadata.reflect(engine)
deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days)
total_rows_purged = 0
for table in reversed(metadata.sorted_tables):
if 'deleted' not in table.columns.keys():
continue
LOG.info('Purging deleted rows older than %(age_in_days)d day(s) '
'from table %(tbl)s',
{'age_in_days': age_in_days, 'tbl': table})
column = table.c.id
updated_at_column = table.c.updated_at
deleted_at_column = table.c.deleted_at
if table.name == 'notifications':
status_column = table.c.status
query_delete = sql.select([column]).where(
and_(updated_at_column < deleted_age, or_(
status_column == 'finished', status_column == 'failed',
status_column == 'ignored'))).order_by(status_column)
else:
query_delete = sql.select(
[column], deleted_at_column < deleted_age).order_by(
deleted_at_column)
if max_rows > 0:
query_delete = query_delete.limit(max_rows - total_rows_purged)
delete_statement = DeleteFromSelect(table, query_delete, column)
result = conn.execute(delete_statement)
rows = result.rowcount
LOG.info('Deleted %(rows)d row(s) from table %(tbl)s',
{'rows': rows, 'tbl': table})
total_rows_purged += rows
if max_rows > 0 and total_rows_purged == max_rows:
break
LOG.info('Total deleted rows are %(rows)d', {'rows': total_rows_purged})

View File

@ -315,6 +315,18 @@ def check_config_option_in_central_place(logical_line, filename):
if "masakari/conf/" in filename:
return
# (pooja_jadhav) All config options (with exceptions that are clarified
# in the list below) were moved to the central place. List below is for
# all options that were impossible to move without doing a major impact
# on code. Add full path to a module or folder.
conf_exceptions = [
# CLI opts are allowed to be outside of masakari/conf directory
'masakari/cmd/manage.py',
]
if any(f in filename for f in conf_exceptions):
return
if cfg_opt_re.match(logical_line):
yield(0, msg)

View File

@ -0,0 +1,167 @@
# Copyright 2017 NTT DATA
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for db purge."""
import datetime
import uuid
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_utils import timeutils
from sqlalchemy.dialects import sqlite
from masakari import context
from masakari import db
from masakari.db.sqlalchemy import api as db_api
from masakari import test
class PurgeDeletedTest(test.TestCase):
def setUp(self):
super(PurgeDeletedTest, self).setUp()
self.context = context.get_admin_context()
self.engine = db_api.get_engine()
self.conn = self.engine.connect()
self.notifications = sqlalchemyutils.get_table(
self.engine, "notifications")
self.failover_segments = sqlalchemyutils.get_table(
self.engine, "failover_segments")
# The hosts table has a FK of segment_id
self.hosts = sqlalchemyutils.get_table(
self.engine, "hosts")
# Add 6 rows to table
self.uuidstrs = []
self.uuid_fs_segments = []
self.uuid_hosts = []
for record in range(6):
notification_uuid = uuid.uuid4().hex
fs_segment_uuid = uuid.uuid4().hex
host_uuid = uuid.uuid4().hex
ins_stmt = self.notifications.insert().values(
notification_uuid=notification_uuid,
generated_time=timeutils.utcnow(),
source_host_uuid=host_uuid,
type='demo',
status='failed')
self.uuidstrs.append(notification_uuid)
self.conn.execute(ins_stmt)
ins_stmt = self.failover_segments.insert().values(
uuid=fs_segment_uuid,
name='test',
service_type='demo',
recovery_method='auto')
self.uuid_fs_segments.append(fs_segment_uuid)
self.conn.execute(ins_stmt)
ins_stmt = self.hosts.insert().values(
uuid=host_uuid,
failover_segment_id=fs_segment_uuid,
name='host1',
type='demo',
control_attributes='test')
self.uuid_hosts.append(host_uuid)
self.conn.execute(ins_stmt)
# Set 4 of them deleted, 2 are 60 days ago, 2 are 20 days ago
self.age_in_days_20 = timeutils.utcnow() - datetime.timedelta(days=20)
self.age_in_days_60 = timeutils.utcnow() - datetime.timedelta(days=60)
make_notifications_old = self.notifications.update().where(
self.notifications.c.notification_uuid.in_(
self.uuidstrs[1:3])).values(updated_at=self.age_in_days_20)
make_notifications_older = self.notifications.update().where(
self.notifications.c.notification_uuid.in_(
self.uuidstrs[4:6])).values(updated_at=self.age_in_days_60)
make_failover_segments_old = self.failover_segments.update().where(
self.failover_segments.c.uuid.in_(
self.uuid_fs_segments[1:3])).values(
deleted_at=self.age_in_days_20)
make_failover_segments_older = self.failover_segments.update().where(
self.failover_segments.c.uuid.in_(
self.uuid_fs_segments[4:6])).values(
deleted_at=self.age_in_days_60)
make_hosts_old = self.hosts.update().where(
self.hosts.c.uuid.in_(self.uuid_hosts[1:3])).values(
deleted_at=self.age_in_days_20)
make_hosts_older = self.hosts.update().where(
self.hosts.c.uuid.in_(self.uuid_hosts[4:6])).values(
deleted_at=self.age_in_days_60)
self.conn.execute(make_notifications_old)
self.conn.execute(make_notifications_older)
self.conn.execute(make_failover_segments_old)
self.conn.execute(make_failover_segments_older)
self.conn.execute(make_hosts_old)
self.conn.execute(make_hosts_older)
dialect = self.engine.url.get_dialect()
if dialect == sqlite.dialect:
# We're seeing issues with foreign key support in SQLite 3.6.20
# SQLAlchemy doesn't support it at all with < SQLite 3.6.19
# It works fine in SQLite 3.7.
# Force foreign_key checking if running SQLite >= 3.7
import sqlite3
tup = sqlite3.sqlite_version_info
if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 7):
self.conn.execute("PRAGMA foreign_keys = ON")
def test_purge_deleted_rows_old(self):
# Purge at 30 days old, should only delete 2 rows
db.purge_deleted_rows(self.context, age_in_days=30, max_rows=10)
notifications_rows = self.conn.execute(
self.notifications.count()).scalar()
failover_segments_rows = self.conn.execute(
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we only deleted 2
self.assertEqual(4, notifications_rows)
self.assertEqual(4, failover_segments_rows)
self.assertEqual(4, hosts_rows)
def test_purge_all_deleted_rows(self):
db.purge_deleted_rows(self.context, age_in_days=20, max_rows=-1)
notifications_rows = self.conn.execute(
self.notifications.count()).scalar()
failover_segments_rows = self.conn.execute(
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we have purged all deleted rows
self.assertEqual(2, notifications_rows)
self.assertEqual(2, failover_segments_rows)
self.assertEqual(2, hosts_rows)
def test_purge_maximum_rows_partial_deleted_records(self):
db.purge_deleted_rows(self.context, age_in_days=60, max_rows=3)
notifications_rows = self.conn.execute(
self.notifications.count()).scalar()
failover_segments_rows = self.conn.execute(
self.failover_segments.count()).scalar()
hosts_rows = self.conn.execute(self.hosts.count()).scalar()
# Verify that we have deleted 3 rows only
self.assertEqual(4, notifications_rows)
self.assertEqual(5, hosts_rows)
self.assertEqual(6, failover_segments_rows)

View File

@ -0,0 +1,75 @@
# Copyright 2017 NTT DATA
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import sys
from masakari.cmd import manage
from masakari import context
from masakari.db import api as db_api
from masakari import test
class DBCommandsTestCase(test.TestCase):
def setUp(self):
super(DBCommandsTestCase, self).setUp()
self.commands = manage.DbCommands()
self.context = context.get_admin_context()
sys.argv = ['masakari-manage']
@mock.patch.object(db_api, 'purge_deleted_rows')
@mock.patch.object(context, 'get_admin_context')
def test_purge_command(self, mock_context, mock_db_purge):
mock_context.return_value = self.context
self.commands.purge(0, 100)
mock_db_purge.assert_called_once_with(self.context, 0, 100)
def test_purge_negative_age_in_days(self):
ex = self.assertRaises(SystemExit, self.commands.purge, -1, 100)
self.assertEqual("Must supply a non-negative value for age.", ex.code)
def test_purge_invalid_age_in_days(self):
ex = self.assertRaises(SystemExit, self.commands.purge, "test", 100)
self.assertEqual("Invalid value for age, test", ex.code)
def test_purge_command_exceeded_age_in_days(self):
ex = self.assertRaises(SystemExit, self.commands.purge, 1000000, 50)
self.assertEqual("Maximal age is count of days since epoch.", ex.code)
def test_purge_invalid_max_rows(self):
ex = self.assertRaises(SystemExit, self.commands.purge, 0, 0)
self.assertEqual("Must supply value greater than 0 for max_rows.",
ex.code)
def test_purge_negative_max_rows(self):
ex = self.assertRaises(SystemExit, self.commands.purge, 0, -5)
self.assertEqual("Invalid input received: max_rows must be >= -1",
ex.code)
@mock.patch.object(db_api, 'purge_deleted_rows')
@mock.patch.object(context, 'get_admin_context')
def test_purge_max_rows(self, mock_context, mock_db_purge):
mock_context.return_value = self.context
value = (2 ** 31) - 1
self.commands.purge(age_in_days=1, max_rows=value)
mock_db_purge.assert_called_once_with(self.context, 1, value)
def test_purge_command_exceeded_maximum_rows(self):
# value(2 ** 31) is greater than max_rows(2147483647) by 1.
value = 2 ** 31
ex = self.assertRaises(SystemExit, self.commands.purge, age_in_days=1,
max_rows=value)
expected = "Invalid input received: max_rows must be <= 2147483647"
self.assertEqual(expected, ex.code)

View File

@ -0,0 +1,11 @@
---
features:
- |
Operators can now purge the soft-deleted records from the database tables.
Added below command to purge the records:
``masakari-manage db purge --age_in_days <days> --max_rows <rows>``
NOTE: ``notifications`` db records will be purged on the basis of ``update_at``
and ``status`` fields (finished, ignored, failed) as these records will not be
automatically soft-deleted by the system.