Implement Locking for Periodic Sync tasks

To avoid overlapping of one sync task with another, lock the
periodic task till one run of task compeltes.

Whenever a periodic sync job starts, It acquires a DB lock and
releases only when that task is finished.
If in meantime other sync tasks starts, It tries to acquire lock,
If it fails to acquire lock then that run of sync task does not do
any rebalancing, it is skipped.
The lock is based on the type of sync job running.
Eg: quota_sync, image_sync.
One quota_sync type of job can only conflict with another quota_sync
type job but cant be with any other type of job.

The lock is based on retry with timeout.
Add sync_lock table to DB with engine_id, timer_lock(a message)
and task_type as columns.

Add UTs for the same.

Change-Id: I9d56caae53a779dc89179d5cd73878058f16bb4f
This commit is contained in:
Ashish Singh 2016-03-15 20:12:49 +05:30
parent 6390579b93
commit 0a22b45205
15 changed files with 434 additions and 11 deletions

View File

@ -6,6 +6,7 @@ namespace = kingbird.common.manager
namespace = kingbird.common.baserpc
namespace = kingbird.db.base
namespace = kingbird.engine.engine_config
namespace = kingbird.engine.kingbird_lock
namespace = kingbird.engine.quota_manager
namespace = kingbird.engine.service
namespace = kingbird.engine.listener

0
kingbird/db/__init__.py Executable file → Normal file
View File

View File

@ -79,3 +79,15 @@ def db_sync(engine, version=None):
def db_version(engine):
"""Display the current database version."""
return IMPL.db_version(engine)
def sync_lock_acquire(context, engine_id, task_type):
return IMPL.sync_lock_acquire(context, engine_id, task_type)
def sync_lock_release(context, task_type):
return IMPL.sync_lock_release(context, task_type)
def sync_lock_steal(context, engine_id, task_type):
return IMPL.sync_lock_steal(context, engine_id, task_type)

0
kingbird/db/base.py Executable file → Normal file
View File

View File

@ -20,6 +20,7 @@ Implementation of SQLAlchemy backend.
import sys
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
@ -198,3 +199,35 @@ def db_sync(engine, version=None):
def db_version(engine):
"""Display the current database version."""
return migration.db_version(engine)
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def sync_lock_acquire(context, engine_id, task_type):
lock = model_query(context, models.SyncLock). \
filter_by(task_type=task_type).all()
if not lock:
lock_ref = models.SyncLock()
lock_ref.engine_id = engine_id
lock_ref.timer_lock = "Lock Acquired for EngineId: " + engine_id
lock_ref.task_type = task_type
session = _session(context)
with session.begin():
lock_ref.save(session)
return True
return False
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def sync_lock_release(context, task_type):
session = _session(context)
locks = model_query(context, models.SyncLock). \
filter_by(task_type=task_type).all()
for lock in locks:
lock.delete(session=session)
def sync_lock_steal(context, engine_id, task_type):
sync_lock_release(context, task_type)
return sync_lock_acquire(context, engine_id, task_type)

0
kingbird/db/sqlalchemy/migrate_repo/manage.py Executable file → Normal file
View File

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
sync_lock = sqlalchemy.Table(
'sync_lock', meta,
sqlalchemy.Column('id', sqlalchemy.Integer,
primary_key=True, nullable=False),
sqlalchemy.Column('timer_lock', sqlalchemy.String(length=255),
nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Integer),
sqlalchemy.Column('engine_id', sqlalchemy.String(length=36),
nullable=False),
sqlalchemy.Column('task_type', sqlalchemy.String(length=36),
nullable=False),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
sync_lock.create()

View File

@ -75,3 +75,21 @@ class Quota(BASE, KingbirdBase):
resource = Column(String(255), nullable=False)
hard_limit = Column(Integer, nullable=False)
class SyncLock(BASE, KingbirdBase):
"""Store locks to avoid overlapping of projects
syncing during automatic periodic sync jobs with
multiple-engines.
"""
__tablename__ = 'sync_lock'
id = Column(Integer, primary_key=True)
engine_id = Column(String(36), nullable=False)
timer_lock = Column(String(255), nullable=False)
task_type = Column(String(36), nullable=False)

View File

@ -0,0 +1,105 @@
# Copyright 2016 Ericsson AB
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LE
from kingbird.common.i18n import _LI
from kingbird.db import api as db_api
from kingbird.engine import scheduler
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
lock_opts = [
cfg.IntOpt('lock_retry_times',
default=3,
help=_('Number of times trying to grab a lock.')),
cfg.IntOpt('lock_retry_interval',
default=10,
help=_('Number of seconds between lock retries.'))
]
lock_opts_group = cfg.OptGroup('locks')
cfg.CONF.register_group(lock_opts_group)
cfg.CONF.register_opts(lock_opts, group=lock_opts_group)
def sync_lock_acquire(context, engine_id, task_type, forced=False):
"""Try to lock with specified engine_id.
:param engine: ID of the engine which wants to lock the projects.
:returns: True if lock is acquired, or False otherwise.
"""
# Step 1: try lock the projects- if it returns True then success
LOG.info(_LI('Trying to acquire lock with %(engId)s for Task: %(task)s'),
{'engId': engine_id,
'task': task_type
}
)
lock_status = db_api.sync_lock_acquire(context, engine_id, task_type)
if lock_status:
return True
# Step 2: retry using global configuration options
retries = cfg.CONF.locks.lock_retry_times
retry_interval = cfg.CONF.locks.lock_retry_interval
while retries > 0:
scheduler.sleep(retry_interval)
LOG.info(_LI('Retry acquire lock with %(engId)s for Task: %(task)s'),
{'engId': engine_id,
'task': task_type
}
)
lock_status = db_api.sync_lock_acquire(context, engine_id, task_type)
if lock_status:
return True
retries = retries - 1
# Step 3: Last resort is 'forced locking', only needed when retry failed
if forced:
lock_status = db_api.sync_lock_steal(context, engine_id, task_type)
if not lock_status:
return False
else:
return True
# Will reach here only when not able to acquire locks with retry
LOG.error(_LE('Not able to acquire lock for %(task)s with retry'
' with engineId %(engId)s'),
{'engId': engine_id,
'task': task_type
}
)
return False
def sync_lock_release(context, engine_id, task_type):
"""Release the lock for the projects"""
LOG.info(_LI('Releasing acquired lock with %(engId)s for Task: %(task)s'),
{'engId': engine_id,
'task': task_type
}
)
return db_api.sync_lock_release(context, task_type)
def list_opt():
yield lock_opts_group.name, lock_opts

View File

@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import uuid
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import time
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
@ -48,6 +50,7 @@ class EngineManager(manager.Manager):
target = messaging.Target(version='1.0')
def __init__(self, *args, **kwargs):
self.engine_id = str(uuid.uuid4())
self.qm = QuotaManager()
self.TG = scheduler.ThreadGroupManager()
self.periodic_enable = cfg.CONF.scheduler.periodic_enable
@ -59,7 +62,7 @@ class EngineManager(manager.Manager):
if self.periodic_enable:
LOG.debug("Adding periodic tasks for the engine to perform")
self.TG.add_timer(self.periodic_interval,
self.periodic_balance_all)
self.periodic_balance_all, None, self.engine_id)
def init_host(self):
LOG.debug(_('Engine init_host...'))
@ -81,11 +84,11 @@ class EngineManager(manager.Manager):
pass
def periodic_balance_all(self):
def periodic_balance_all(self, engine_id):
# Automated Quota Sync for all the keystone projects
LOG.info(_LI("Periodic quota sync job started at: %s"),
time.strftime("%c"))
self.qm.periodic_balance_all()
self.qm.periodic_balance_all(engine_id)
def quota_sync_for_project(self, ctx, project_id):
# On Demand Quota Sync for a project, will be triggered by KB-API

View File

@ -17,6 +17,7 @@ import collections
from Queue import Queue
import re
import threading
import time
from oslo_config import cfg
from oslo_log import log as logging
@ -32,6 +33,7 @@ from kingbird.common import manager
from kingbird.common import utils
from kingbird.db import api as db_api
from kingbird.drivers.openstack import sdk
from kingbird.engine import kingbird_lock
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -47,6 +49,7 @@ batch_opts = [
batch_opt_group = cfg.OptGroup('batch')
cfg.CONF.register_group(batch_opt_group)
cfg.CONF.register_opts(batch_opts, group=batch_opt_group)
TASK_TYPE = 'quota_sync'
class QuotaManager(manager.Manager):
@ -60,8 +63,19 @@ class QuotaManager(manager.Manager):
self.context = context.get_admin_context()
self.endpoints = endpoint_cache.EndpointCache()
def periodic_balance_all(self):
def periodic_balance_all(self, engine_id):
LOG.info(_LI("periodically balance quota for all keystone tenants"))
lock = kingbird_lock.sync_lock_acquire(self.context, engine_id,
TASK_TYPE)
if not lock:
LOG.error(_LE("Not able to acquire lock for %(task_type)s, may"
" be Previous sync job has not finished yet, "
"Aborting this run at: %(time)s "),
{'task_type': TASK_TYPE,
'time': time.strftime("%c")}
)
return
LOG.info(_LI("Successfully acquired lock"))
projects_thread_list = []
# Iterate through project list and call sync project for each project
# using threads
@ -83,11 +97,15 @@ class QuotaManager(manager.Manager):
# the job(sync all projects quota)
for current_thread in projects_thread_list:
current_thread.join()
kingbird_lock.sync_lock_release(self.context, engine_id, TASK_TYPE)
def read_quota_usage(self, project_id, region, usage_queue):
# Writes usage dict to the Queue in the following format
# {'region_name': (<nova_usages>, <neutron_usages>, <cinder_usages>)}
LOG.info(_LI("Reading quota usage for project: %s"), project_id)
LOG.info(_LI("Reading quota usage for %(project_id)s in %(region)s"),
{'project_id': project_id,
'region': region}
)
os_client = sdk.OpenStackDriver(region)
region_usage = os_client.get_resource_usages(project_id)
total_region_usage = collections.defaultdict(dict)

View File

@ -0,0 +1,106 @@
# Copyright (c) 2015 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from oslo_config import cfg
from oslo_db import options
from kingbird.db import api as api
from kingbird.db.sqlalchemy import api as db_api
from kingbird.tests import base
from kingbird.tests import utils
get_engine = api.get_engine
UUID1 = utils.UUID1
FAKE_TASK_TYPE = 'fake_sync'
FAKE_TASK_TYPE_2 = 'fake_sync2'
class DBAPISyncLockTest(base.KingbirdTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://",
sqlite_db='kingbird.db')
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
def reset_dummy_db(self):
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
def acquire_lock(self, ctxt, FAKE_TASK_TYPE):
return db_api.sync_lock_acquire(ctxt, utils.UUID1, FAKE_TASK_TYPE)
def release_lock(self, ctxt, FAKE_TASK_TYPE):
return db_api.sync_lock_release(ctxt, FAKE_TASK_TYPE)
def steal_lock(self, ctxt, FAKE_TASK_TYPE):
return db_api.sync_lock_steal(ctxt, utils.UUID1, FAKE_TASK_TYPE)
def setUp(self):
super(DBAPISyncLockTest, self).setUp()
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctxt = utils.dummy_context()
def test_sync_lock_acquire(self):
expected_value = self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
self.assertEqual(expected_value, True)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
def test_sync_lock_release(self):
self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
# Lock is released, Now check If we can acquire a lock
lock = self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
self.assertEqual(lock, True)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
def test_sync_lock_acquire_fail_same_task_type(self):
self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
second_lock = self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
# Lock cannot be acquired for second time as it is not released
self.assertEqual(second_lock, False)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
def test_sync_lock_steal(self):
expected_value = self.steal_lock(self.ctxt, FAKE_TASK_TYPE)
self.assertEqual(expected_value, True)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
def test_sync_lock_steal_with_allready_acquired_lock(self):
self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
steal_lock = self.steal_lock(self.ctxt, FAKE_TASK_TYPE)
self.assertEqual(steal_lock, True)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
def test_sync_lock_acquire_with_different_task_type(self):
expected_value = self.acquire_lock(self.ctxt, FAKE_TASK_TYPE)
expected_value_2 = self.acquire_lock(self.ctxt, FAKE_TASK_TYPE_2)
self.assertEqual(expected_value, True)
self.assertEqual(expected_value_2, True)
self.release_lock(self.ctxt, FAKE_TASK_TYPE)
self.release_lock(self.ctxt, FAKE_TASK_TYPE_2)

View File

@ -0,0 +1,65 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from oslo_config import cfg
from kingbird.common import config
from kingbird.engine import kingbird_lock
from kingbird.tests import base
from kingbird.tests import utils
config.register_options()
FAKE_ENGINE_ID = str(uuid.uuid4())
FAKE_TASK_TYPE = 'fake_sync'
cfg.CONF.import_group("locks", "kingbird.engine.kingbird_lock")
cfg.CONF.import_group("locks", "kingbird.engine.kingbird_lock")
cfg.CONF.set_override('lock_retry_times', 2, group='locks')
cfg.CONF.set_override('lock_retry_interval', 1, group='locks')
class TestKingbirdLock(base.KingbirdTestCase):
def setUp(self):
super(TestKingbirdLock, self).setUp()
self.context = utils.dummy_context()
@mock.patch.object(kingbird_lock, 'db_api')
def test_sync_lock_acquire(self, mock_db_api):
mock_db_api.sync_lock_acquire.return_value = "Fake Record"
expected_value = kingbird_lock.sync_lock_acquire(
self.context, FAKE_ENGINE_ID, FAKE_TASK_TYPE)
self.assertEqual(expected_value, True)
@mock.patch.object(kingbird_lock, 'db_api')
def test_sync_lock_acquire_force_yes(self, mock_db_api):
mock_db_api.sync_lock_acquire.return_value = False
mock_db_api.db_api.sync_lock_steal.return_value = True
expected_value = kingbird_lock.sync_lock_acquire(
self.context, FAKE_ENGINE_ID, FAKE_TASK_TYPE, True)
self.assertEqual(expected_value, True)
@mock.patch.object(kingbird_lock, 'db_api')
def test_sync_lock_release(self, mock_db_api):
kingbird_lock.sync_lock_release(self.context, FAKE_ENGINE_ID,
FAKE_TASK_TYPE)
mock_db_api.sync_lock_release.assert_called_once_with(self.context,
FAKE_TASK_TYPE)
@mock.patch.object(kingbird_lock, 'db_api')
def test_sync_lock_acquire_fail(self, mock_db_api):
mock_db_api.sync_lock_acquire.return_value = False
expected_value = kingbird_lock.sync_lock_acquire(
self.context, FAKE_ENGINE_ID, FAKE_TASK_TYPE)
self.assertEqual(expected_value, False)

View File

@ -11,12 +11,14 @@
# under the License.
import mock
import uuid
from kingbird.engine import listener
from kingbird.tests import base
from kingbird.tests import utils
FAKE_PROJECT = 'fake_project'
FAKE_ENGINE_ID = str(uuid.uuid4())
class TestEngineManager(base.KingbirdTestCase):
@ -33,8 +35,8 @@ class TestEngineManager(base.KingbirdTestCase):
@mock.patch.object(listener, 'QuotaManager')
def test_periodic_balance_all(self, mock_qm):
engine_manager = listener.EngineManager()
engine_manager.periodic_balance_all()
mock_qm().periodic_balance_all.assert_called_once_with()
engine_manager.periodic_balance_all(FAKE_ENGINE_ID)
mock_qm().periodic_balance_all.assert_called_once_with(FAKE_ENGINE_ID)
@mock.patch.object(listener, 'QuotaManager')
def test_quota_sync_for_project(self, mock_qm):

View File

@ -12,6 +12,7 @@
from collections import Counter
import mock
from Queue import Queue
import uuid
from oslo_config import cfg
@ -23,6 +24,7 @@ from kingbird.tests import utils
CONF = cfg.CONF
FAKE_PROJECT = 'fake_project'
FAKE_REGION = 'fake_region'
FAKE_ENGINE_ID = str(uuid.uuid4())
NOVA_USAGE = {'ram': 100, 'cores': '50'}
NEUTRON_USAGE = {'port': 10}
CINDER_USAGE = {'volumes': 18}
@ -32,6 +34,7 @@ TOTAL_USAGE = {}
TOTAL_USAGE.update(NOVA_USAGE)
TOTAL_USAGE.update(NEUTRON_USAGE)
TOTAL_USAGE.update(CINDER_USAGE)
TASK_TYPE = 'quota_sync'
class TestQuotaManager(base.KingbirdTestCase):
@ -49,16 +52,23 @@ class TestQuotaManager(base.KingbirdTestCase):
self.assertEqual('localhost', qm.host)
self.assertEqual(self.ctxt, qm.context)
@mock.patch.object(quota_manager, 'context')
@mock.patch.object(quota_manager.QuotaManager, 'quota_sync_for_project')
@mock.patch.object(quota_manager, 'sdk')
@mock.patch.object(quota_manager, 'endpoint_cache')
def test_periodic_balance_all(self, mock_endpoint,
mock_sdk, mock_quota_sync):
@mock.patch.object(quota_manager, 'kingbird_lock')
def test_periodic_balance_all(self, mock_kb_lock, mock_endpoint,
mock_sdk, mock_quota_sync, mock_context):
mock_context.get_admin_context.return_value = self.ctxt
mock_sdk.OpenStackDriver().get_enabled_projects.return_value = \
['proj1']
mock_kb_lock.sync_lock_acquire.return_value = True
qm = quota_manager.QuotaManager()
qm.periodic_balance_all()
qm.periodic_balance_all(FAKE_ENGINE_ID)
mock_quota_sync.assert_called_with('proj1')
mock_kb_lock.sync_lock_release.assert_called_once_with(self.ctxt,
FAKE_ENGINE_ID,
TASK_TYPE)
@mock.patch.object(quota_manager, 'sdk')
@mock.patch.object(quota_manager, 'endpoint_cache')
@ -200,3 +210,16 @@ class TestQuotaManager(base.KingbirdTestCase):
qm = quota_manager.QuotaManager()
qm.quota_sync_for_project(FAKE_PROJECT)
mock_update.assert_not_called()
@mock.patch.object(quota_manager.QuotaManager, 'quota_sync_for_project')
@mock.patch.object(quota_manager, 'sdk')
@mock.patch.object(quota_manager, 'endpoint_cache')
@mock.patch.object(quota_manager, 'kingbird_lock')
def test_periodic_balance_all_lock_fail(self, mock_kb_lock, mock_endpoint,
mock_sdk, mock_quota_sync):
mock_sdk.OpenStackDriver().get_enabled_projects.return_value = \
['proj1']
mock_kb_lock.sync_lock_acquire.return_value = False
qm = quota_manager.QuotaManager()
qm.periodic_balance_all(FAKE_ENGINE_ID)
mock_quota_sync.assert_not_called()