Lookup a random action to execute

This patch change the behavior of scheduler to random pick up
a 'READY' action instead of the first 'READY' action.

Change-Id: I470c0aa3776f78273f4a4d623d0140c99b92214f
Partial-Bug: #1648681
This commit is contained in:
Ethan Lynn 2016-12-12 21:51:13 +08:00
parent ec1471f3de
commit f84a1a07dd
6 changed files with 18 additions and 15 deletions

View File

@ -368,8 +368,8 @@ def action_acquire(context, action_id, owner, timestamp):
return IMPL.action_acquire(context, action_id, owner, timestamp)
def action_acquire_1st_ready(context, owner, timestamp):
return IMPL.action_acquire_1st_ready(context, owner, timestamp)
def action_acquire_random_ready(context, owner, timestamp):
return IMPL.action_acquire_random_ready(context, owner, timestamp)
def action_abandon(context, action_id):

View File

@ -28,6 +28,7 @@ from oslo_utils import timeutils
import osprofiler.sqlalchemy
import sqlalchemy
from sqlalchemy.orm import joinedload_all
from sqlalchemy.sql.expression import func
from senlin.common import consts
from senlin.common import exception
@ -1136,11 +1137,12 @@ def action_acquire(context, action_id, owner, timestamp):
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def action_acquire_1st_ready(context, owner, timestamp):
def action_acquire_random_ready(context, owner, timestamp):
with session_for_write() as session:
action = session.query(models.Action).\
filter_by(status=consts.ACTION_READY).\
filter_by(owner=None).\
order_by(func.random()).\
with_for_update().first()
if action:

View File

@ -117,8 +117,8 @@ class ThreadGroupManager(object):
batch_interval = cfg.CONF.batch_interval
while True:
timestamp = wallclock()
action = ao.Action.acquire_1st_ready(self.db_session, worker_id,
timestamp)
action = ao.Action.acquire_random_ready(self.db_session, worker_id,
timestamp)
if action:
if batch_size > 0 and 'NODE' in action.action:
if actions_launched < batch_size:

View File

@ -97,8 +97,8 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
return db_api.action_acquire(context, action_id, owner, timestamp)
@classmethod
def acquire_1st_ready(cls, context, owner, timestamp):
return db_api.action_acquire_1st_ready(context, owner, timestamp)
def acquire_random_ready(cls, context, owner, timestamp):
return db_api.action_acquire_random_ready(context, owner, timestamp)
@classmethod
def abandon(cls, context, action_id):

View File

@ -103,7 +103,7 @@ class DBAPIActionTest(base.SenlinTestCase):
retobj = db_api.action_get(new_ctx, action.id, project_safe=True)
self.assertIsNotNone(retobj)
def test_action_acquire_1st_ready(self):
def test_action_acquire_random_ready(self):
specs = [
{'name': 'A01', 'status': 'INIT'},
{'name': 'A02', 'status': 'READY', 'owner': 'worker1'},
@ -116,8 +116,9 @@ class DBAPIActionTest(base.SenlinTestCase):
worker = 'worker2'
timestamp = time.time()
action = db_api.action_acquire_1st_ready(self.ctx, worker, timestamp)
self.assertEqual('A04', action.name)
action = db_api.action_acquire_random_ready(self.ctx, worker,
timestamp)
self.assertIn(action.name, ('A02', 'A04'))
self.assertEqual('worker2', action.owner)
self.assertEqual(consts.ACTION_RUNNING, action.status)
self.assertEqual(timestamp, action.start_time)

View File

@ -90,7 +90,7 @@ class SchedulerTest(base.SenlinTestCase):
oslo_context.get_current(),
None, f)
@mock.patch.object(db_api, 'action_acquire_1st_ready')
@mock.patch.object(db_api, 'action_acquire_random_ready')
@mock.patch.object(db_api, 'action_acquire')
def test_start_action(self, mock_action_acquire,
mock_action_acquire_1st):
@ -113,7 +113,7 @@ class SchedulerTest(base.SenlinTestCase):
self.assertEqual(mock_thread, tgm.workers['0123'])
mock_thread.link.assert_called_once_with(mock.ANY, '0123')
@mock.patch.object(db_api, 'action_acquire_1st_ready')
@mock.patch.object(db_api, 'action_acquire_random_ready')
def test_start_action_no_action_id(self, mock_acquire_action):
mock_action = mock.Mock()
mock_action.id = '0123'
@ -135,7 +135,7 @@ class SchedulerTest(base.SenlinTestCase):
mock_thread.link.assert_called_once_with(mock.ANY, '0123')
@mock.patch.object(scheduler, 'sleep')
@mock.patch.object(db_api, 'action_acquire_1st_ready')
@mock.patch.object(db_api, 'action_acquire_random_ready')
def test_start_action_batch_control(self, mock_acquire_action, mock_sleep):
mock_action1 = mock.Mock()
mock_action1.id = 'ID1'
@ -158,7 +158,7 @@ class SchedulerTest(base.SenlinTestCase):
mock_sleep.assert_called_once_with(3)
@mock.patch.object(db_api, 'action_acquire_1st_ready')
@mock.patch.object(db_api, 'action_acquire_random_ready')
@mock.patch.object(db_api, 'action_acquire')
def test_start_action_failed_locking_action(self, mock_acquire_action,
mock_acquire_action_1st):
@ -171,7 +171,7 @@ class SchedulerTest(base.SenlinTestCase):
res = tgm.start_action('4567', '0123')
self.assertIsNone(res)
@mock.patch.object(db_api, 'action_acquire_1st_ready')
@mock.patch.object(db_api, 'action_acquire_random_ready')
def test_start_action_no_action_ready(self, mock_acquire_action):
mock_acquire_action.return_value = None
mock_group = mock.Mock()