From f84a1a07dd3c7d33acd04a7869e96ab29a949849 Mon Sep 17 00:00:00 2001 From: Ethan Lynn Date: Mon, 12 Dec 2016 21:51:13 +0800 Subject: [PATCH] Lookup a random action to execute This patch change the behavior of scheduler to random pick up a 'READY' action instead of the first 'READY' action. Change-Id: I470c0aa3776f78273f4a4d623d0140c99b92214f Partial-Bug: #1648681 --- senlin/db/api.py | 4 ++-- senlin/db/sqlalchemy/api.py | 4 +++- senlin/engine/scheduler.py | 4 ++-- senlin/objects/action.py | 4 ++-- senlin/tests/unit/db/test_action_api.py | 7 ++++--- senlin/tests/unit/engine/test_scheduler.py | 10 +++++----- 6 files changed, 18 insertions(+), 15 deletions(-) diff --git a/senlin/db/api.py b/senlin/db/api.py index 31756d1b9..da2d469b8 100644 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -368,8 +368,8 @@ def action_acquire(context, action_id, owner, timestamp): return IMPL.action_acquire(context, action_id, owner, timestamp) -def action_acquire_1st_ready(context, owner, timestamp): - return IMPL.action_acquire_1st_ready(context, owner, timestamp) +def action_acquire_random_ready(context, owner, timestamp): + return IMPL.action_acquire_random_ready(context, owner, timestamp) def action_abandon(context, action_id): diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index 72718c583..4583edcf1 100644 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -28,6 +28,7 @@ from oslo_utils import timeutils import osprofiler.sqlalchemy import sqlalchemy from sqlalchemy.orm import joinedload_all +from sqlalchemy.sql.expression import func from senlin.common import consts from senlin.common import exception @@ -1136,11 +1137,12 @@ def action_acquire(context, action_id, owner, timestamp): @oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, retry_interval=0.5, inc_retry_interval=True) -def action_acquire_1st_ready(context, owner, timestamp): +def action_acquire_random_ready(context, owner, timestamp): with session_for_write() as session: action = session.query(models.Action).\ filter_by(status=consts.ACTION_READY).\ filter_by(owner=None).\ + order_by(func.random()).\ with_for_update().first() if action: diff --git a/senlin/engine/scheduler.py b/senlin/engine/scheduler.py index a19cd823e..faa24a81d 100644 --- a/senlin/engine/scheduler.py +++ b/senlin/engine/scheduler.py @@ -117,8 +117,8 @@ class ThreadGroupManager(object): batch_interval = cfg.CONF.batch_interval while True: timestamp = wallclock() - action = ao.Action.acquire_1st_ready(self.db_session, worker_id, - timestamp) + action = ao.Action.acquire_random_ready(self.db_session, worker_id, + timestamp) if action: if batch_size > 0 and 'NODE' in action.action: if actions_launched < batch_size: diff --git a/senlin/objects/action.py b/senlin/objects/action.py index 4c1e0d335..ee7995a23 100644 --- a/senlin/objects/action.py +++ b/senlin/objects/action.py @@ -97,8 +97,8 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat): return db_api.action_acquire(context, action_id, owner, timestamp) @classmethod - def acquire_1st_ready(cls, context, owner, timestamp): - return db_api.action_acquire_1st_ready(context, owner, timestamp) + def acquire_random_ready(cls, context, owner, timestamp): + return db_api.action_acquire_random_ready(context, owner, timestamp) @classmethod def abandon(cls, context, action_id): diff --git a/senlin/tests/unit/db/test_action_api.py b/senlin/tests/unit/db/test_action_api.py index d9fd47700..5f9c0d432 100644 --- a/senlin/tests/unit/db/test_action_api.py +++ b/senlin/tests/unit/db/test_action_api.py @@ -103,7 +103,7 @@ class DBAPIActionTest(base.SenlinTestCase): retobj = db_api.action_get(new_ctx, action.id, project_safe=True) self.assertIsNotNone(retobj) - def test_action_acquire_1st_ready(self): + def test_action_acquire_random_ready(self): specs = [ {'name': 'A01', 'status': 'INIT'}, {'name': 'A02', 'status': 'READY', 'owner': 'worker1'}, @@ -116,8 +116,9 @@ class DBAPIActionTest(base.SenlinTestCase): worker = 'worker2' timestamp = time.time() - action = db_api.action_acquire_1st_ready(self.ctx, worker, timestamp) - self.assertEqual('A04', action.name) + action = db_api.action_acquire_random_ready(self.ctx, worker, + timestamp) + self.assertIn(action.name, ('A02', 'A04')) self.assertEqual('worker2', action.owner) self.assertEqual(consts.ACTION_RUNNING, action.status) self.assertEqual(timestamp, action.start_time) diff --git a/senlin/tests/unit/engine/test_scheduler.py b/senlin/tests/unit/engine/test_scheduler.py index 24bd5a94d..efd417b19 100644 --- a/senlin/tests/unit/engine/test_scheduler.py +++ b/senlin/tests/unit/engine/test_scheduler.py @@ -90,7 +90,7 @@ class SchedulerTest(base.SenlinTestCase): oslo_context.get_current(), None, f) - @mock.patch.object(db_api, 'action_acquire_1st_ready') + @mock.patch.object(db_api, 'action_acquire_random_ready') @mock.patch.object(db_api, 'action_acquire') def test_start_action(self, mock_action_acquire, mock_action_acquire_1st): @@ -113,7 +113,7 @@ class SchedulerTest(base.SenlinTestCase): self.assertEqual(mock_thread, tgm.workers['0123']) mock_thread.link.assert_called_once_with(mock.ANY, '0123') - @mock.patch.object(db_api, 'action_acquire_1st_ready') + @mock.patch.object(db_api, 'action_acquire_random_ready') def test_start_action_no_action_id(self, mock_acquire_action): mock_action = mock.Mock() mock_action.id = '0123' @@ -135,7 +135,7 @@ class SchedulerTest(base.SenlinTestCase): mock_thread.link.assert_called_once_with(mock.ANY, '0123') @mock.patch.object(scheduler, 'sleep') - @mock.patch.object(db_api, 'action_acquire_1st_ready') + @mock.patch.object(db_api, 'action_acquire_random_ready') def test_start_action_batch_control(self, mock_acquire_action, mock_sleep): mock_action1 = mock.Mock() mock_action1.id = 'ID1' @@ -158,7 +158,7 @@ class SchedulerTest(base.SenlinTestCase): mock_sleep.assert_called_once_with(3) - @mock.patch.object(db_api, 'action_acquire_1st_ready') + @mock.patch.object(db_api, 'action_acquire_random_ready') @mock.patch.object(db_api, 'action_acquire') def test_start_action_failed_locking_action(self, mock_acquire_action, mock_acquire_action_1st): @@ -171,7 +171,7 @@ class SchedulerTest(base.SenlinTestCase): res = tgm.start_action('4567', '0123') self.assertIsNone(res) - @mock.patch.object(db_api, 'action_acquire_1st_ready') + @mock.patch.object(db_api, 'action_acquire_random_ready') def test_start_action_no_action_ready(self, mock_acquire_action): mock_acquire_action.return_value = None mock_group = mock.Mock()