Merge "Fix cooldown check"
This commit is contained in:
commit
765e48ff9b
|
@ -427,14 +427,10 @@ class Action(object):
|
|||
|
||||
for pb in bindings:
|
||||
policy = policy_mod.Policy.load(self.context, pb.policy_id)
|
||||
# We record the last operation time for all policies bound to the
|
||||
# cluster, no matter that policy is only interested in the
|
||||
# "BEFORE" or "AFTER" or both.
|
||||
if target == 'AFTER':
|
||||
ts = timeutils.utcnow(True)
|
||||
pb.last_op = ts
|
||||
cpo.ClusterPolicy.update(self.context, pb.cluster_id,
|
||||
pb.policy_id, {'last_op': ts})
|
||||
|
||||
# add last_op as input for the policy so that it can be used
|
||||
# during pre_op
|
||||
self.inputs['last_op'] = pb.last_op
|
||||
|
||||
if not policy.need_check(target, self):
|
||||
continue
|
||||
|
@ -444,13 +440,6 @@ class Action(object):
|
|||
else: # target == 'AFTER'
|
||||
method = getattr(policy, 'post_op', None)
|
||||
|
||||
if getattr(policy, 'cooldown', None):
|
||||
if pb.cooldown_inprogress(policy.cooldown):
|
||||
self.data['status'] = policy_mod.CHECK_ERROR
|
||||
self.data['reason'] = ('Policy %s cooldown is still '
|
||||
'in progress.') % policy.id
|
||||
return
|
||||
|
||||
if method is not None:
|
||||
method(cluster_id, self)
|
||||
|
||||
|
|
|
@ -12,8 +12,6 @@
|
|||
|
||||
"""Cluster-policy binding object."""
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from senlin.db import api as db_api
|
||||
from senlin.objects import base
|
||||
from senlin.objects import cluster as cluster_obj
|
||||
|
@ -86,13 +84,6 @@ class ClusterPolicy(base.SenlinObject, base.VersionedObjectDictCompat):
|
|||
def delete(cls, context, cluster_id, policy_id):
|
||||
db_api.cluster_policy_detach(context, cluster_id, policy_id)
|
||||
|
||||
def cooldown_inprogress(self, cooldown):
|
||||
last_op = self.last_op
|
||||
if last_op and not timeutils.is_older_than(last_op, cooldown):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def to_dict(self):
|
||||
binding_dict = {
|
||||
'id': self.id,
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from senlin.common import constraints
|
||||
from senlin.common import consts
|
||||
|
@ -19,6 +20,7 @@ from senlin.common.i18n import _
|
|||
from senlin.common import scaleutils as su
|
||||
from senlin.common import schema
|
||||
from senlin.common import utils
|
||||
from senlin.objects import cluster_policy as cpo
|
||||
from senlin.policies import base
|
||||
|
||||
|
||||
|
@ -44,6 +46,8 @@ class ScalingPolicy(base.Policy):
|
|||
TARGET = [
|
||||
('BEFORE', consts.CLUSTER_SCALE_IN),
|
||||
('BEFORE', consts.CLUSTER_SCALE_OUT),
|
||||
('AFTER', consts.CLUSTER_SCALE_IN),
|
||||
('AFTER', consts.CLUSTER_SCALE_OUT),
|
||||
]
|
||||
|
||||
PROFILE_TYPE = [
|
||||
|
@ -186,6 +190,17 @@ class ScalingPolicy(base.Policy):
|
|||
:return: None.
|
||||
"""
|
||||
|
||||
# check cooldown
|
||||
last_op = action.inputs.get('last_op', None)
|
||||
if last_op and not timeutils.is_older_than(last_op, self.cooldown):
|
||||
action.data.update({
|
||||
'status': base.CHECK_ERROR,
|
||||
'reason': _('Policy %s cooldown is still '
|
||||
'in progress.') % self.id
|
||||
})
|
||||
action.store(action.context)
|
||||
return
|
||||
|
||||
# Use action input if count is provided
|
||||
count_value = action.inputs.get('count', None)
|
||||
cluster = action.entity
|
||||
|
@ -243,10 +258,26 @@ class ScalingPolicy(base.Policy):
|
|||
|
||||
return
|
||||
|
||||
def need_check(self, target, action):
|
||||
res = super(ScalingPolicy, self).need_check(target, action)
|
||||
if res:
|
||||
# Check if the action is expected by the policy
|
||||
res = (self.event == action.action)
|
||||
def post_op(self, cluster_id, action):
|
||||
# update last_op for next cooldown check
|
||||
ts = timeutils.utcnow(True)
|
||||
cpo.ClusterPolicy.update(action.context, cluster_id,
|
||||
self.id, {'last_op': ts})
|
||||
|
||||
return res
|
||||
def need_check(self, target, action):
|
||||
# check if target + action matches policy targets
|
||||
if not super(ScalingPolicy, self).need_check(target, action):
|
||||
return False
|
||||
|
||||
if target == 'BEFORE':
|
||||
# Scaling policy BEFORE check should only be triggered if the
|
||||
# incoming action matches the specific policy event.
|
||||
# E.g. for scale-out policy the BEFORE check to select nodes for
|
||||
# termination should only run for scale-out actions.
|
||||
return self.event == action.action
|
||||
else:
|
||||
# Scaling policy AFTER check to reset cooldown timer should be
|
||||
# triggered for all supported policy events (both scale-in and
|
||||
# scale-out). E.g. a scale-out policy should reset cooldown timer
|
||||
# whenever scale-out or scale-in action completes.
|
||||
return action.action in list(self._SUPPORTED_EVENTS)
|
||||
|
|
|
@ -682,7 +682,7 @@ class ActionPolicyCheckTest(base.SenlinTestCase):
|
|||
filters={'enabled': True})
|
||||
mock_load.assert_called_once_with(action.context, policy.id)
|
||||
# last_op was updated anyway
|
||||
self.assertIsNotNone(pb.last_op)
|
||||
self.assertEqual(action.inputs['last_op'], pb.last_op)
|
||||
# neither pre_op nor post_op was called, because target not match
|
||||
self.assertEqual(0, mock_pre_op.call_count)
|
||||
self.assertEqual(0, mock_post_op.call_count)
|
||||
|
@ -768,45 +768,11 @@ class ActionPolicyCheckTest(base.SenlinTestCase):
|
|||
filters={'enabled': True})
|
||||
mock_load.assert_called_once_with(action.context, policy.id)
|
||||
# last_op was updated for POST check
|
||||
self.assertIsNotNone(pb.last_op)
|
||||
self.assertEqual(action.inputs['last_op'], pb.last_op)
|
||||
# pre_op is called, but post_op was not called
|
||||
self.assertEqual(0, policy.pre_op.call_count)
|
||||
policy.post_op.assert_called_once_with(cluster_id, action)
|
||||
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'cooldown_inprogress')
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'get_all')
|
||||
@mock.patch.object(policy_mod.Policy, 'load')
|
||||
def test_policy_check_cooldown_inprogress(self, mock_load, mock_load_all,
|
||||
mock_inprogress):
|
||||
cluster_id = CLUSTER_ID
|
||||
# Note: policy is mocked
|
||||
policy_id = uuidutils.generate_uuid()
|
||||
policy = mock.Mock(id=policy_id, TARGET=[('AFTER', 'OBJECT_ACTION')])
|
||||
# Note: policy binding is created but not stored
|
||||
pb = self._create_cp_binding(cluster_id, policy.id)
|
||||
mock_inprogress.return_value = True
|
||||
mock_load_all.return_value = [pb]
|
||||
mock_load.return_value = policy
|
||||
action = ab.Action(cluster_id, 'OBJECT_ACTION', self.ctx)
|
||||
|
||||
# Do it
|
||||
res = action.policy_check(CLUSTER_ID, 'AFTER')
|
||||
|
||||
self.assertIsNone(res)
|
||||
self.assertEqual(policy_mod.CHECK_ERROR, action.data['status'])
|
||||
self.assertEqual(
|
||||
'Policy %s cooldown is still in progress.' % policy_id,
|
||||
six.text_type(action.data['reason']))
|
||||
mock_load_all.assert_called_once_with(
|
||||
action.context, cluster_id, sort='priority',
|
||||
filters={'enabled': True})
|
||||
mock_load.assert_called_once_with(action.context, policy.id)
|
||||
# last_op was updated for POST check
|
||||
self.assertIsNotNone(pb.last_op)
|
||||
# neither pre_op nor post_op was not called, due to cooldown
|
||||
self.assertEqual(0, policy.pre_op.call_count)
|
||||
self.assertEqual(0, policy.post_op.call_count)
|
||||
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'get_all')
|
||||
@mock.patch.object(policy_mod.Policy, 'load')
|
||||
@mock.patch.object(ab.Action, '_check_result')
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import timedelta
|
||||
import testtools
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from senlin.objects import cluster_policy as cpo
|
||||
|
||||
CLUSTER_ID = "8286fcaa-6474-44e2-873e-28b5cb2c204c"
|
||||
POLICY_ID = "da958a16-f384-49a1-83a9-abac8b4ec46e"
|
||||
|
||||
|
||||
class TestClusterPolicy(testtools.TestCase):
|
||||
|
||||
def test_cooldown_inprogress(self):
|
||||
last_op = timeutils.utcnow(True)
|
||||
cp = cpo.ClusterPolicy(cluster_id=CLUSTER_ID, policy_id=POLICY_ID,
|
||||
last_op=last_op)
|
||||
|
||||
res = cp.cooldown_inprogress(60)
|
||||
|
||||
self.assertTrue(res)
|
||||
|
||||
cp.last_op -= timedelta(hours=1)
|
||||
|
||||
res = cp.cooldown_inprogress(60)
|
||||
|
||||
self.assertFalse(res)
|
|
@ -11,10 +11,13 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import time
|
||||
|
||||
from senlin.common import consts
|
||||
from senlin.common import exception as exc
|
||||
from senlin.objects import cluster_policy as cpo
|
||||
from senlin.objects import node as no
|
||||
from senlin.policies import base as pb
|
||||
from senlin.policies import scaling_policy as sp
|
||||
|
@ -207,14 +210,17 @@ class TestScalingPolicy(base.SenlinTestCase):
|
|||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_SCALE_IN
|
||||
action.inputs = {'count': 1}
|
||||
action.inputs = {'count': 1, 'last_op': timeutils.utcnow(True)}
|
||||
action.entity = self.cluster
|
||||
|
||||
adjustment = self.spec['properties']['adjustment']
|
||||
adjustment['type'] = consts.CHANGE_IN_CAPACITY
|
||||
adjustment['number'] = 2
|
||||
adjustment['cooldown'] = 1
|
||||
policy = sp.ScalingPolicy('p1', self.spec)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
policy.pre_op(self.cluster['id'], action)
|
||||
pd = {
|
||||
'deletion': {
|
||||
|
@ -238,6 +244,26 @@ class TestScalingPolicy(base.SenlinTestCase):
|
|||
}
|
||||
action.data.update.assert_called_with(pd)
|
||||
|
||||
def test_pre_op_within_cooldown(self):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_SCALE_IN
|
||||
action.inputs = {'last_op': timeutils.utcnow(True)}
|
||||
action.entity = self.cluster
|
||||
|
||||
adjustment = self.spec['properties']['adjustment']
|
||||
adjustment['cooldown'] = 300
|
||||
kwargs = {'id': "FAKE_ID"}
|
||||
policy = sp.ScalingPolicy('p1', self.spec, **kwargs)
|
||||
|
||||
policy.pre_op('FAKE_CLUSTER_ID', action)
|
||||
pd = {
|
||||
'status': pb.CHECK_ERROR,
|
||||
'reason': "Policy FAKE_ID cooldown is still in progress.",
|
||||
}
|
||||
action.data.update.assert_called_with(pd)
|
||||
action.store.assert_called_with(self.context)
|
||||
|
||||
@mock.patch.object(sp.ScalingPolicy, '_calculate_adjustment_count')
|
||||
def test_pre_op_pass_check_effort(self, mock_adjustmentcount):
|
||||
# Cluster with maxsize and best_effort is False
|
||||
|
@ -367,7 +393,23 @@ class TestScalingPolicy(base.SenlinTestCase):
|
|||
action.data.update.assert_called_with(pd)
|
||||
action.store.assert_called_with(self.context)
|
||||
|
||||
def test_need_check_in_event(self):
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'update')
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_post_op(self, mock_time, mock_cluster_policy):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
|
||||
mock_time.return_value = 'FAKE_TIME'
|
||||
|
||||
kwargs = {'id': 'FAKE_POLICY_ID'}
|
||||
policy = sp.ScalingPolicy('test-policy', self.spec, **kwargs)
|
||||
|
||||
policy.post_op('FAKE_CLUSTER_ID', action)
|
||||
mock_cluster_policy.assert_called_once_with(
|
||||
action.context, 'FAKE_CLUSTER_ID', 'FAKE_POLICY_ID',
|
||||
{'last_op': 'FAKE_TIME'})
|
||||
|
||||
def test_need_check_in_event_before(self):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_SCALE_IN
|
||||
|
@ -377,7 +419,7 @@ class TestScalingPolicy(base.SenlinTestCase):
|
|||
res = policy.need_check('BEFORE', action)
|
||||
self.assertTrue(res)
|
||||
|
||||
def test_need_check_not_in_event(self):
|
||||
def test_need_check_not_in_event_before(self):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_SCALE_OUT
|
||||
|
@ -386,3 +428,23 @@ class TestScalingPolicy(base.SenlinTestCase):
|
|||
policy = sp.ScalingPolicy('test-policy', self.spec)
|
||||
res = policy.need_check('BEFORE', action)
|
||||
self.assertFalse(res)
|
||||
|
||||
def test_need_check_in_event_after(self):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_SCALE_OUT
|
||||
action.data = {}
|
||||
|
||||
policy = sp.ScalingPolicy('test-policy', self.spec)
|
||||
res = policy.need_check('AFTER', action)
|
||||
self.assertTrue(res)
|
||||
|
||||
def test_need_check_not_in_event_after(self):
|
||||
action = mock.Mock()
|
||||
action.context = self.context
|
||||
action.action = consts.CLUSTER_ATTACH_POLICY
|
||||
action.data = {}
|
||||
|
||||
policy = sp.ScalingPolicy('test-policy', self.spec)
|
||||
res = policy.need_check('AFTER', action)
|
||||
self.assertFalse(res)
|
||||
|
|
Loading…
Reference in New Issue