Update scaling policy logic to be applied before action acceptance

This patch alters the cluster_scale_in and cluster_scale_out actions to
no longer place the action into the actions table when a conflict is
detected. This behavior is an improvement on the old way actions are
processed as the requester will now receive immediate feedback from the API
when an action cannot be processed. This change  also honors the scaling
action cooldown in the same manner by erring via the API when a scaling
action cannot be processed due to cooldown.

Depends-On: https://review.openstack.org/#/c/602460/
Implements: blueprint scaling-action-acceptance
Change-Id: If0dcf5e427d3d6973d2c5e52fada8a6c925240d5
This commit is contained in:
Jude Cross 2018-07-24 17:03:07 -07:00
parent a34e45b696
commit 364b1402a1
15 changed files with 187 additions and 9 deletions

View File

@ -437,6 +437,7 @@ Response Codes
- 401
- 403
- 404
- 409
- 503
Request Parameters
@ -494,6 +495,7 @@ Response Codes
- 401
- 403
- 404
- 409
- 503
Request Parameters

View File

@ -22,7 +22,7 @@ Concept
~~~~~~~
A :term:`Policy Type` is an abstract specification of the rules to be checked
and/or enforced when certain :term:`Action` is performed on a cluster that
and/or enforced when an :term:`Action` is performed on a cluster that
contains nodes of certain :term:`Profile Type`.
A registry of policy types is built in memory when the Senlin engine

View File

@ -49,14 +49,16 @@ scaling behavior in both directions using the same policy.
Senlin has carefully designed the builtin policy types so that for scaling
policies, you can attach more than one instance of the same policy type but
you may get an error when you are attempting to attach two policies of anther
you may get an error when you are attempting to attach two policies of another
type (say ``senlin.policy.deletion``) to the same cluster.
The value of the ``event`` property indicates when the policy will be checked.
A policy with ``event`` set to "``CLUSTER_SCALE_IN``" will be checked when and
only when a corresponding action is triggered on the cluster. A policy with
``event`` set to "``CLUSTER_SCALE_OUT``" will be checked when and only when
a corresponding action is triggered.
a corresponding action is triggered. If the cluster is currently processing a
scaling action it will not accept another scaling action until the current
action has been processed and cooldown has been observed.
For both types of actions that can triggered the scaling policy, there are
always three types of adjustments to choose from as listed below. The type

View File

@ -0,0 +1,21 @@
---
prelude: >
This release alters the cluster_scale_in and cluster_scale_out actions to
no longer place the action into the actions table when a conflict is
detected. This behavior is an improvement on the old way actions are
processed as the requester will now receive immediate feedback from the
API when an action cannot be processed. This release also honors the
scaling action cooldown in the same manner by erring via the API when a
scaling action cannot be processed due to cooldown.
features:
- |
Scaling actions (IN or OUT) now validate that there is no conflicting
action already being processed and will return an error via the API
informing the end user if a conflict is detected. A conflicting action is
detected when new action of either `CLUSTER_SCALE_IN` or
`CLUSTER_SCALE_OUT` is attempted while there is already cluster scaling
action in the action table in a pending status (READY, RUNNING, WAITING,
ACTION_WAITING_LIFECYCLE_COMPLETION).
Additinally the cooldown will be checked and enforced when a scaling
action is requested. If the cooldown is being observed the requester will
be informed of this when submitting the action via an error.

View File

@ -42,6 +42,8 @@ class FaultWrapper(wsgi.Middleware):
"""Replace error body with something the client can parse."""
error_map = {
'ActionConflict': webob.exc.HTTPConflict,
'ActionCooldown': webob.exc.HTTPConflict,
'ActionInProgress': webob.exc.HTTPConflict,
'BadRequest': webob.exc.HTTPBadRequest,
'FeatureNotSupported': webob.exc.HTTPConflict,
@ -57,7 +59,6 @@ class FaultWrapper(wsgi.Middleware):
'RequestLimitExceeded': webob.exc.HTTPBadRequest,
'ResourceInUse': webob.exc.HTTPConflict,
'ResourceIsLocked': webob.exc.HTTPConflict,
'ActionConflict': webob.exc.HTTPConflict,
'ResourceNotFound': webob.exc.HTTPNotFound,
}

View File

@ -111,3 +111,9 @@ it can be used by both users and developers.
are now sent directly in the query body rather than in the params
field.
1.11
----
- Modified the ``cluster_action`` API. The API now responds with
response code 409 when a scaling action conflicts with one already
being processed or a cooldown for a scaling action is encountered.

View File

@ -24,7 +24,7 @@ class VersionController(object):
# This includes any semantic changes which may not affect the input or
# output formats or even originate in the API code layer.
_MIN_API_VERSION = "1.0"
_MAX_API_VERSION = "1.10"
_MAX_API_VERSION = "1.11"
DEFAULT_API_VERSION = _MIN_API_VERSION

View File

@ -64,6 +64,8 @@ CLUSTER_ACTION_NAMES = (
'CLUSTER_OPERATION',
)
CLUSTER_SCALE_ACTIONS = [CLUSTER_SCALE_IN, CLUSTER_SCALE_OUT]
NODE_ACTION_NAMES = (
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
NODE_JOIN, NODE_LEAVE,

View File

@ -194,6 +194,12 @@ class ActionConflict(SenlinException):
"the following action(s): %(actions)s")
class ActionCooldown(SenlinException):
msg_fmt = _("The %(type)s action for cluster %(cluster)s cannot be "
"processed due to Policy %(policy_id)s cooldown still in "
"progress")
class NodeNotOrphan(SenlinException):
msg_fmt = _("%(message)s")
@ -202,8 +208,8 @@ class InternalError(SenlinException):
"""A base class for internal exceptions in senlin.
The internal exception classes which inherit from :class:`SenlinException`
class should be translated to a user facing exception type if need to be
made user visible.
class should be translated to a user facing exception type if they need to
be made user visible.
"""
msg_fmt = _("%(message)s")
message = _('Internal error happened')

View File

@ -329,6 +329,11 @@ def action_get(context, action_id, project_safe=True, refresh=False):
refresh=refresh)
def action_list_active_scaling(context, cluster_id, project_safe=True):
return IMPL.action_list_active_scaling(context, cluster_id,
project_safe=project_safe)
def action_get_by_name(context, name, project_safe=True):
return IMPL.action_get_by_name(context, name, project_safe=project_safe)

View File

@ -1067,6 +1067,25 @@ def action_get(context, action_id, project_safe=True, refresh=False):
return action
def action_list_active_scaling(context, cluster_id=None, project_safe=True):
with session_for_read() as session:
query = session.query(models.Action)
if project_safe:
query = query.filter_by(project=context.project_id)
if cluster_id:
query = query.filter_by(target=cluster_id)
query = query.filter(
models.Action.status.in_(
[consts.ACTION_READY,
consts.ACTION_WAITING,
consts.ACTION_RUNNING,
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
query = query.filter(
models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS))
scaling_actions = query.all()
return scaling_actions
def action_get_by_name(context, name, project_safe=True):
return query_by_name(context, models.Action, name,
project_safe=project_safe)

View File

@ -274,6 +274,9 @@ class Action(object):
}
c = req_context.RequestContext.from_dict(params)
if action in consts.CLUSTER_SCALE_ACTIONS:
Action.validate_scaling_action(c, target, action)
obj = cls(target, action, c, **kwargs)
return obj.store(ctx)
@ -466,6 +469,70 @@ class Action(object):
return
return
@staticmethod
def validate_scaling_action(ctx, cluster_id, action):
"""Validate scaling action against actions table and policy cooldown.
:param ctx: An instance of the request context.
:param cluster_id: ID of the cluster the scaling action is targeting.
:param action: Scaling action being validated.
:return: None
:raises: An exception of ``ActionCooldown`` when the action being
validated is still in cooldown based off the policy or
``ActionConflict`` when a scaling action is already in the action
table.
"""
# Check for conflicting actions in the actions table.
conflicting_actions = Action._get_conflicting_scaling_actions(
ctx, cluster_id)
if conflicting_actions:
action_ids = [a.get('id', None) for a in conflicting_actions]
LOG.info("Unable to process %(action)s for cluster %(cluster_id)s "
"the action conflicts with %(conflicts)s",
{'action': action,
'cluster_id': cluster_id,
'conflicts': action_ids})
raise exception.ActionConflict(
type=action,
target=cluster_id,
actions=",".join(action_ids))
# Check to see if action cooldown should be observed.
bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id,
sort='priority',
filters={'enabled': True})
for pb in bindings:
policy = policy_mod.Policy.load(ctx, pb.policy_id)
if getattr(policy, 'cooldown', None) and policy.event == action:
if pb.last_op and not timeutils.is_older_than(
pb.last_op, policy.cooldown):
LOG.info("Unable to process %(action)s for cluster "
"%(cluster_id)s the actions policy %(policy)s "
"cooldown still in progress",
{'action': action,
'cluster_id': cluster_id,
'policy': pb.policy_id})
raise exception.ActionCooldown(
type=action,
cluster=cluster_id,
policy_id=pb.policy_id)
return
@staticmethod
def _get_conflicting_scaling_actions(ctx, cluster_id):
"""Check actions table for conflicting scaling actions.
:param ctx: An instance of the request context.
:param cluster_id: ID of the cluster the scaling action is targeting.
:return: A list of conflicting actions.
"""
scaling_actions = ao.Action.action_list_active_scaling(
ctx, cluster_id)
if scaling_actions:
return [a.to_dict() for a in scaling_actions]
else:
return None
def to_dict(self):
if self.id:
dep_on = dobj.Dependency.get_depended(self.context, self.id)

View File

@ -2511,7 +2511,7 @@ class EngineService(service.Service):
receiver = receiver_obj.Receiver.find(ctx, identity)
try:
cluster = co.Cluster.find(ctx, receiver.cluster_id)
db_cluster = co.Cluster.find(ctx, receiver.cluster_id)
except (exception.ResourceNotFound, exception.MultipleChoices) as ex:
msg = ex.enhance_msg('referenced', ex)
raise exception.BadRequest(msg=msg)
@ -2527,7 +2527,7 @@ class EngineService(service.Service):
'inputs': data
}
action_id = action_mod.Action.create(ctx, cluster.id,
action_id = action_mod.Action.create(ctx, db_cluster.id,
receiver.action, **kwargs)
dispatcher.start_action()
LOG.info("Webhook %(w)s triggered with action queued: %(a)s.",

View File

@ -95,6 +95,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
obj = db_api.action_get_by_short_id(context, short_id, **kwargs)
return cls._from_db_object(context, cls(), obj)
@classmethod
def action_list_active_scaling(cls, context, cluster_id, **kwargs):
objs = db_api.action_list_active_scaling(context, cluster_id, **kwargs)
return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod
def get_all(cls, context, **kwargs):
objs = db_api.action_get_all(context, **kwargs)

View File

@ -94,6 +94,11 @@ class ActionBaseTest(base.SenlinTestCase):
self.assertIsNone(obj.updated_at)
self.assertEqual({}, obj.data)
def _create_cp_binding(self, cluster_id, policy_id):
return cpo.ClusterPolicy(cluster_id=cluster_id, policy_id=policy_id,
enabled=True, id=uuidutils.generate_uuid(),
last_op=None)
@mock.patch.object(cluster_mod.Cluster, 'load')
def test_action_new_cluster(self, mock_load):
fake_cluster = mock.Mock(timeout=cfg.CONF.default_action_timeout)
@ -337,6 +342,43 @@ class ActionBaseTest(base.SenlinTestCase):
mock_store.assert_not_called()
mock_active.assert_called_once_with(mock.ANY, OBJID)
@mock.patch.object(timeutils, 'is_older_than')
@mock.patch.object(cpo.ClusterPolicy, 'get_all')
@mock.patch.object(policy_mod.Policy, 'load')
@mock.patch.object(ab.Action, 'store')
def test_action_create_scaling_cooldown_in_progress(self, mock_store,
mock_load,
mock_load_all,
mock_time_util):
cluster_id = CLUSTER_ID
# Note: policy is mocked
policy_id = uuidutils.generate_uuid()
policy = mock.Mock(id=policy_id,
TARGET=[('AFTER', 'CLUSTER_SCALE_OUT')],
event='CLUSTER_SCALE_OUT',
cooldown=240)
pb = self._create_cp_binding(cluster_id, policy.id)
pb.last_op = timeutils.utcnow(True)
mock_load_all.return_value = [pb]
mock_load.return_value = policy
mock_time_util.return_value = False
self.assertRaises(exception.ActionCooldown, ab.Action.create, self.ctx,
cluster_id, 'CLUSTER_SCALE_OUT')
self.assertEqual(0, mock_store.call_count)
@mock.patch.object(ao.Action, 'action_list_active_scaling')
@mock.patch.object(ab.Action, 'store')
def test_action_create_scaling_conflict(self, mock_store,
mock_list_active):
cluster_id = CLUSTER_ID
mock_action = mock.Mock()
mock_action.to_dict.return_value = {'id': 'fake_action_id'}
mock_list_active.return_value = [mock_action]
self.assertRaises(exception.ActionConflict, ab.Action.create, self.ctx,
cluster_id, 'CLUSTER_SCALE_IN')
self.assertEqual(0, mock_store.call_count)
def test_action_delete(self):
result = ab.Action.delete(self.ctx, 'non-existent')
self.assertIsNone(result)