Merge "Update scaling policy logic to be applied before action acceptance"
This commit is contained in:
commit
111ea8eabd
|
@ -437,6 +437,7 @@ Response Codes
|
|||
- 401
|
||||
- 403
|
||||
- 404
|
||||
- 409
|
||||
- 503
|
||||
|
||||
Request Parameters
|
||||
|
@ -494,6 +495,7 @@ Response Codes
|
|||
- 401
|
||||
- 403
|
||||
- 404
|
||||
- 409
|
||||
- 503
|
||||
|
||||
Request Parameters
|
||||
|
|
|
@ -22,7 +22,7 @@ Concept
|
|||
~~~~~~~
|
||||
|
||||
A :term:`Policy Type` is an abstract specification of the rules to be checked
|
||||
and/or enforced when certain :term:`Action` is performed on a cluster that
|
||||
and/or enforced when an :term:`Action` is performed on a cluster that
|
||||
contains nodes of certain :term:`Profile Type`.
|
||||
|
||||
A registry of policy types is built in memory when the Senlin engine
|
||||
|
|
|
@ -49,14 +49,16 @@ scaling behavior in both directions using the same policy.
|
|||
|
||||
Senlin has carefully designed the builtin policy types so that for scaling
|
||||
policies, you can attach more than one instance of the same policy type but
|
||||
you may get an error when you are attempting to attach two policies of anther
|
||||
you may get an error when you are attempting to attach two policies of another
|
||||
type (say ``senlin.policy.deletion``) to the same cluster.
|
||||
|
||||
The value of the ``event`` property indicates when the policy will be checked.
|
||||
A policy with ``event`` set to "``CLUSTER_SCALE_IN``" will be checked when and
|
||||
only when a corresponding action is triggered on the cluster. A policy with
|
||||
``event`` set to "``CLUSTER_SCALE_OUT``" will be checked when and only when
|
||||
a corresponding action is triggered.
|
||||
a corresponding action is triggered. If the cluster is currently processing a
|
||||
scaling action it will not accept another scaling action until the current
|
||||
action has been processed and cooldown has been observed.
|
||||
|
||||
For both types of actions that can triggered the scaling policy, there are
|
||||
always three types of adjustments to choose from as listed below. The type
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
---
|
||||
prelude: >
|
||||
This release alters the cluster_scale_in and cluster_scale_out actions to
|
||||
no longer place the action into the actions table when a conflict is
|
||||
detected. This behavior is an improvement on the old way actions are
|
||||
processed as the requester will now receive immediate feedback from the
|
||||
API when an action cannot be processed. This release also honors the
|
||||
scaling action cooldown in the same manner by erring via the API when a
|
||||
scaling action cannot be processed due to cooldown.
|
||||
features:
|
||||
- |
|
||||
Scaling actions (IN or OUT) now validate that there is no conflicting
|
||||
action already being processed and will return an error via the API
|
||||
informing the end user if a conflict is detected. A conflicting action is
|
||||
detected when new action of either `CLUSTER_SCALE_IN` or
|
||||
`CLUSTER_SCALE_OUT` is attempted while there is already cluster scaling
|
||||
action in the action table in a pending status (READY, RUNNING, WAITING,
|
||||
ACTION_WAITING_LIFECYCLE_COMPLETION).
|
||||
Additinally the cooldown will be checked and enforced when a scaling
|
||||
action is requested. If the cooldown is being observed the requester will
|
||||
be informed of this when submitting the action via an error.
|
|
@ -42,6 +42,8 @@ class FaultWrapper(wsgi.Middleware):
|
|||
"""Replace error body with something the client can parse."""
|
||||
|
||||
error_map = {
|
||||
'ActionConflict': webob.exc.HTTPConflict,
|
||||
'ActionCooldown': webob.exc.HTTPConflict,
|
||||
'ActionInProgress': webob.exc.HTTPConflict,
|
||||
'BadRequest': webob.exc.HTTPBadRequest,
|
||||
'FeatureNotSupported': webob.exc.HTTPConflict,
|
||||
|
@ -57,7 +59,6 @@ class FaultWrapper(wsgi.Middleware):
|
|||
'RequestLimitExceeded': webob.exc.HTTPBadRequest,
|
||||
'ResourceInUse': webob.exc.HTTPConflict,
|
||||
'ResourceIsLocked': webob.exc.HTTPConflict,
|
||||
'ActionConflict': webob.exc.HTTPConflict,
|
||||
'ResourceNotFound': webob.exc.HTTPNotFound,
|
||||
}
|
||||
|
||||
|
|
|
@ -111,3 +111,9 @@ it can be used by both users and developers.
|
|||
are now sent directly in the query body rather than in the params
|
||||
field.
|
||||
|
||||
1.11
|
||||
----
|
||||
- Modified the ``cluster_action`` API. The API now responds with
|
||||
response code 409 when a scaling action conflicts with one already
|
||||
being processed or a cooldown for a scaling action is encountered.
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ class VersionController(object):
|
|||
# This includes any semantic changes which may not affect the input or
|
||||
# output formats or even originate in the API code layer.
|
||||
_MIN_API_VERSION = "1.0"
|
||||
_MAX_API_VERSION = "1.10"
|
||||
_MAX_API_VERSION = "1.11"
|
||||
|
||||
DEFAULT_API_VERSION = _MIN_API_VERSION
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ CLUSTER_ACTION_NAMES = (
|
|||
'CLUSTER_OPERATION',
|
||||
)
|
||||
|
||||
CLUSTER_SCALE_ACTIONS = [CLUSTER_SCALE_IN, CLUSTER_SCALE_OUT]
|
||||
|
||||
NODE_ACTION_NAMES = (
|
||||
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
|
||||
NODE_JOIN, NODE_LEAVE,
|
||||
|
|
|
@ -194,6 +194,12 @@ class ActionConflict(SenlinException):
|
|||
"the following action(s): %(actions)s")
|
||||
|
||||
|
||||
class ActionCooldown(SenlinException):
|
||||
msg_fmt = _("The %(type)s action for cluster %(cluster)s cannot be "
|
||||
"processed due to Policy %(policy_id)s cooldown still in "
|
||||
"progress")
|
||||
|
||||
|
||||
class NodeNotOrphan(SenlinException):
|
||||
msg_fmt = _("%(message)s")
|
||||
|
||||
|
@ -202,8 +208,8 @@ class InternalError(SenlinException):
|
|||
"""A base class for internal exceptions in senlin.
|
||||
|
||||
The internal exception classes which inherit from :class:`SenlinException`
|
||||
class should be translated to a user facing exception type if need to be
|
||||
made user visible.
|
||||
class should be translated to a user facing exception type if they need to
|
||||
be made user visible.
|
||||
"""
|
||||
msg_fmt = _("%(message)s")
|
||||
message = _('Internal error happened')
|
||||
|
|
|
@ -329,6 +329,11 @@ def action_get(context, action_id, project_safe=True, refresh=False):
|
|||
refresh=refresh)
|
||||
|
||||
|
||||
def action_list_active_scaling(context, cluster_id, project_safe=True):
|
||||
return IMPL.action_list_active_scaling(context, cluster_id,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def action_get_by_name(context, name, project_safe=True):
|
||||
return IMPL.action_get_by_name(context, name, project_safe=project_safe)
|
||||
|
||||
|
|
|
@ -1067,6 +1067,25 @@ def action_get(context, action_id, project_safe=True, refresh=False):
|
|||
return action
|
||||
|
||||
|
||||
def action_list_active_scaling(context, cluster_id=None, project_safe=True):
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Action)
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
if cluster_id:
|
||||
query = query.filter_by(target=cluster_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
query = query.filter(
|
||||
models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS))
|
||||
scaling_actions = query.all()
|
||||
return scaling_actions
|
||||
|
||||
|
||||
def action_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Action, name,
|
||||
project_safe=project_safe)
|
||||
|
|
|
@ -274,6 +274,9 @@ class Action(object):
|
|||
}
|
||||
c = req_context.RequestContext.from_dict(params)
|
||||
|
||||
if action in consts.CLUSTER_SCALE_ACTIONS:
|
||||
Action.validate_scaling_action(c, target, action)
|
||||
|
||||
obj = cls(target, action, c, **kwargs)
|
||||
return obj.store(ctx)
|
||||
|
||||
|
@ -466,6 +469,70 @@ class Action(object):
|
|||
return
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def validate_scaling_action(ctx, cluster_id, action):
|
||||
"""Validate scaling action against actions table and policy cooldown.
|
||||
|
||||
:param ctx: An instance of the request context.
|
||||
:param cluster_id: ID of the cluster the scaling action is targeting.
|
||||
:param action: Scaling action being validated.
|
||||
:return: None
|
||||
:raises: An exception of ``ActionCooldown`` when the action being
|
||||
validated is still in cooldown based off the policy or
|
||||
``ActionConflict`` when a scaling action is already in the action
|
||||
table.
|
||||
"""
|
||||
# Check for conflicting actions in the actions table.
|
||||
conflicting_actions = Action._get_conflicting_scaling_actions(
|
||||
ctx, cluster_id)
|
||||
if conflicting_actions:
|
||||
action_ids = [a.get('id', None) for a in conflicting_actions]
|
||||
LOG.info("Unable to process %(action)s for cluster %(cluster_id)s "
|
||||
"the action conflicts with %(conflicts)s",
|
||||
{'action': action,
|
||||
'cluster_id': cluster_id,
|
||||
'conflicts': action_ids})
|
||||
raise exception.ActionConflict(
|
||||
type=action,
|
||||
target=cluster_id,
|
||||
actions=",".join(action_ids))
|
||||
|
||||
# Check to see if action cooldown should be observed.
|
||||
bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id,
|
||||
sort='priority',
|
||||
filters={'enabled': True})
|
||||
for pb in bindings:
|
||||
policy = policy_mod.Policy.load(ctx, pb.policy_id)
|
||||
if getattr(policy, 'cooldown', None) and policy.event == action:
|
||||
if pb.last_op and not timeutils.is_older_than(
|
||||
pb.last_op, policy.cooldown):
|
||||
LOG.info("Unable to process %(action)s for cluster "
|
||||
"%(cluster_id)s the actions policy %(policy)s "
|
||||
"cooldown still in progress",
|
||||
{'action': action,
|
||||
'cluster_id': cluster_id,
|
||||
'policy': pb.policy_id})
|
||||
raise exception.ActionCooldown(
|
||||
type=action,
|
||||
cluster=cluster_id,
|
||||
policy_id=pb.policy_id)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def _get_conflicting_scaling_actions(ctx, cluster_id):
|
||||
"""Check actions table for conflicting scaling actions.
|
||||
|
||||
:param ctx: An instance of the request context.
|
||||
:param cluster_id: ID of the cluster the scaling action is targeting.
|
||||
:return: A list of conflicting actions.
|
||||
"""
|
||||
scaling_actions = ao.Action.action_list_active_scaling(
|
||||
ctx, cluster_id)
|
||||
if scaling_actions:
|
||||
return [a.to_dict() for a in scaling_actions]
|
||||
else:
|
||||
return None
|
||||
|
||||
def to_dict(self):
|
||||
if self.id:
|
||||
dep_on = dobj.Dependency.get_depended(self.context, self.id)
|
||||
|
|
|
@ -2511,7 +2511,7 @@ class EngineService(service.Service):
|
|||
receiver = receiver_obj.Receiver.find(ctx, identity)
|
||||
|
||||
try:
|
||||
cluster = co.Cluster.find(ctx, receiver.cluster_id)
|
||||
db_cluster = co.Cluster.find(ctx, receiver.cluster_id)
|
||||
except (exception.ResourceNotFound, exception.MultipleChoices) as ex:
|
||||
msg = ex.enhance_msg('referenced', ex)
|
||||
raise exception.BadRequest(msg=msg)
|
||||
|
@ -2527,7 +2527,7 @@ class EngineService(service.Service):
|
|||
'inputs': data
|
||||
}
|
||||
|
||||
action_id = action_mod.Action.create(ctx, cluster.id,
|
||||
action_id = action_mod.Action.create(ctx, db_cluster.id,
|
||||
receiver.action, **kwargs)
|
||||
dispatcher.start_action()
|
||||
LOG.info("Webhook %(w)s triggered with action queued: %(a)s.",
|
||||
|
|
|
@ -95,6 +95,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
|||
obj = db_api.action_get_by_short_id(context, short_id, **kwargs)
|
||||
return cls._from_db_object(context, cls(), obj)
|
||||
|
||||
@classmethod
|
||||
def action_list_active_scaling(cls, context, cluster_id, **kwargs):
|
||||
objs = db_api.action_list_active_scaling(context, cluster_id, **kwargs)
|
||||
return [cls._from_db_object(context, cls(), obj) for obj in objs]
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, context, **kwargs):
|
||||
objs = db_api.action_get_all(context, **kwargs)
|
||||
|
|
|
@ -94,6 +94,11 @@ class ActionBaseTest(base.SenlinTestCase):
|
|||
self.assertIsNone(obj.updated_at)
|
||||
self.assertEqual({}, obj.data)
|
||||
|
||||
def _create_cp_binding(self, cluster_id, policy_id):
|
||||
return cpo.ClusterPolicy(cluster_id=cluster_id, policy_id=policy_id,
|
||||
enabled=True, id=uuidutils.generate_uuid(),
|
||||
last_op=None)
|
||||
|
||||
@mock.patch.object(cluster_mod.Cluster, 'load')
|
||||
def test_action_new_cluster(self, mock_load):
|
||||
fake_cluster = mock.Mock(timeout=cfg.CONF.default_action_timeout)
|
||||
|
@ -337,6 +342,43 @@ class ActionBaseTest(base.SenlinTestCase):
|
|||
mock_store.assert_not_called()
|
||||
mock_active.assert_called_once_with(mock.ANY, OBJID)
|
||||
|
||||
@mock.patch.object(timeutils, 'is_older_than')
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'get_all')
|
||||
@mock.patch.object(policy_mod.Policy, 'load')
|
||||
@mock.patch.object(ab.Action, 'store')
|
||||
def test_action_create_scaling_cooldown_in_progress(self, mock_store,
|
||||
mock_load,
|
||||
mock_load_all,
|
||||
mock_time_util):
|
||||
cluster_id = CLUSTER_ID
|
||||
# Note: policy is mocked
|
||||
policy_id = uuidutils.generate_uuid()
|
||||
policy = mock.Mock(id=policy_id,
|
||||
TARGET=[('AFTER', 'CLUSTER_SCALE_OUT')],
|
||||
event='CLUSTER_SCALE_OUT',
|
||||
cooldown=240)
|
||||
pb = self._create_cp_binding(cluster_id, policy.id)
|
||||
pb.last_op = timeutils.utcnow(True)
|
||||
mock_load_all.return_value = [pb]
|
||||
mock_load.return_value = policy
|
||||
mock_time_util.return_value = False
|
||||
self.assertRaises(exception.ActionCooldown, ab.Action.create, self.ctx,
|
||||
cluster_id, 'CLUSTER_SCALE_OUT')
|
||||
self.assertEqual(0, mock_store.call_count)
|
||||
|
||||
@mock.patch.object(ao.Action, 'action_list_active_scaling')
|
||||
@mock.patch.object(ab.Action, 'store')
|
||||
def test_action_create_scaling_conflict(self, mock_store,
|
||||
mock_list_active):
|
||||
cluster_id = CLUSTER_ID
|
||||
|
||||
mock_action = mock.Mock()
|
||||
mock_action.to_dict.return_value = {'id': 'fake_action_id'}
|
||||
mock_list_active.return_value = [mock_action]
|
||||
self.assertRaises(exception.ActionConflict, ab.Action.create, self.ctx,
|
||||
cluster_id, 'CLUSTER_SCALE_IN')
|
||||
self.assertEqual(0, mock_store.call_count)
|
||||
|
||||
def test_action_delete(self):
|
||||
result = ab.Action.delete(self.ctx, 'non-existent')
|
||||
self.assertIsNone(result)
|
||||
|
|
Loading…
Reference in New Issue