Add waiting for the driver to SchedulerManager

This patch adds _wait_for_scheduler method before serving any request.
Method waits till scheduler.is_ready() returns true or
CONF.periodic_interval seconds passed from service startup.

Change-Id: I9fab9fb076a955a24c1c157229baf027359d9771
Closes-Bug: 1409012
This commit is contained in:
Michal Dulko 2015-03-12 17:24:09 +01:00
parent 9cf6e694a3
commit 89106c5272
3 changed files with 101 additions and 2 deletions

View File

@ -19,6 +19,7 @@
Scheduler Service
"""
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
@ -75,11 +76,15 @@ class SchedulerManager(manager.Manager):
'combination of filters and weighers.'))
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
self._startup_delay = True
def init_host_with_rpc(self):
ctxt = context.get_admin_context()
self.request_service_capabilities(ctxt)
eventlet.sleep(CONF.periodic_interval)
self._startup_delay = False
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
"""Process a capability update from a service node."""
@ -89,10 +94,18 @@ class SchedulerManager(manager.Manager):
host,
capabilities)
def _wait_for_scheduler(self):
# NOTE(dulek): We're waiting for scheduler to announce that it's ready
# or CONF.periodic_interval seconds from service startup has passed.
while self._startup_delay and not self.driver.is_ready():
eventlet.sleep(1)
def create_consistencygroup(self, context, topic,
group_id,
request_spec_list=None,
filter_properties_list=None):
self._wait_for_scheduler()
try:
self.driver.schedule_create_consistencygroup(
context, group_id,
@ -117,6 +130,7 @@ class SchedulerManager(manager.Manager):
image_id=None, request_spec=None,
filter_properties=None):
self._wait_for_scheduler()
try:
flow_engine = create_volume.get_flow(context,
db, self.driver,
@ -142,6 +156,8 @@ class SchedulerManager(manager.Manager):
filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
def _migrate_volume_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'migration_status': None}}
self._set_volume_state_and_notify('migrate_volume_to_host',
@ -173,6 +189,9 @@ class SchedulerManager(manager.Manager):
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
"""
self._wait_for_scheduler()
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
@ -223,6 +242,8 @@ class SchedulerManager(manager.Manager):
request_spec, filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
def _manage_existing_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('manage_existing', volume_state,
@ -244,7 +265,12 @@ class SchedulerManager(manager.Manager):
request_spec.get('ref'))
def get_pools(self, context, filters=None):
"""Get active pools from scheduler's cache."""
"""Get active pools from scheduler's cache.
NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is
an RPC call (is blocking for the c-api). Also this is admin-only API
extension so it won't hurt the user much to retry the request manually.
"""
return self.driver.get_pools(context, filters)
def _set_volume_state_and_notify(self, method, updates, context, ex,

View File

@ -22,6 +22,7 @@ import string
import uuid
import fixtures
import mock
from oslo_log import log as logging
from cinder import service
@ -69,7 +70,10 @@ class _IntegratedTestBase(test.TestCase):
# set up services
self.volume = self.start_service('volume')
self.scheduler = self.start_service('scheduler')
# NOTE(dulek): Mocking eventlet.sleep so test won't time out on
# scheduler service start.
with mock.patch('eventlet.sleep'):
self.scheduler = self.start_service('scheduler')
self._start_api_service()
self.addCleanup(self.osapi.stop)

View File

@ -49,6 +49,7 @@ class SchedulerManagerTestCase(test.TestCase):
super(SchedulerManagerTestCase, self).setUp()
self.flags(scheduler_driver=self.driver_cls_name)
self.manager = self.manager_cls()
self.manager._startup_delay = False
self.context = context.RequestContext('fake_user', 'fake_project')
self.topic = 'fake_topic'
self.fake_args = (1, 2, 3)
@ -59,6 +60,15 @@ class SchedulerManagerTestCase(test.TestCase):
manager = self.manager
self.assertIsInstance(manager.driver, self.driver_cls)
@mock.patch('eventlet.sleep')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.publish_service_capabilities')
def test_init_host_with_rpc(self, publish_capabilities_mock, sleep_mock):
self.manager._startup_delay = True
self.manager.init_host_with_rpc()
publish_capabilities_mock.assert_called_once_with(mock.ANY)
sleep_mock.assert_called_once_with(CONF.periodic_interval)
self.assertFalse(self.manager._startup_delay)
@mock.patch('cinder.scheduler.driver.Scheduler.'
'update_service_capabilities')
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
@ -105,6 +115,65 @@ class SchedulerManagerTestCase(test.TestCase):
_mock_sched_create.assert_called_once_with(self.context, request_spec,
{})
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('eventlet.sleep')
def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create):
fake_volume_id = 1
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id}
self.manager.create_volume(self.context, topic, fake_volume_id,
request_spec=request_spec,
filter_properties={})
_mock_sched_create.assert_called_once_with(self.context, request_spec,
{})
self.assertFalse(_mock_sleep.called)
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
@mock.patch('eventlet.sleep')
def test_create_volume_delay_scheduled_after_3_tries(self, _mock_sleep,
_mock_is_ready,
_mock_sched_create):
self.manager._startup_delay = True
fake_volume_id = 1
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id}
_mock_is_ready.side_effect = [False, False, True]
self.manager.create_volume(self.context, topic, fake_volume_id,
request_spec=request_spec,
filter_properties={})
_mock_sched_create.assert_called_once_with(self.context, request_spec,
{})
calls = [mock.call(1)] * 2
_mock_sleep.assert_has_calls(calls)
self.assertEqual(2, _mock_sleep.call_count)
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
@mock.patch('eventlet.sleep')
def test_create_volume_delay_scheduled_in_1_try(self, _mock_sleep,
_mock_is_ready,
_mock_sched_create):
self.manager._startup_delay = True
fake_volume_id = 1
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id}
_mock_is_ready.return_value = True
self.manager.create_volume(self.context, topic, fake_volume_id,
request_spec=request_spec,
filter_properties={})
_mock_sched_create.assert_called_once_with(self.context, request_spec,
{})
self.assertFalse(_mock_sleep.called)
@mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters')
@mock.patch('cinder.db.volume_update')
def test_migrate_volume_exception_returns_volume_state(