diff --git a/api-ref/source/clusters.inc b/api-ref/source/clusters.inc index ff8b880b5..209fc6315 100644 --- a/api-ref/source/clusters.inc +++ b/api-ref/source/clusters.inc @@ -437,6 +437,7 @@ Response Codes - 401 - 403 - 404 + - 409 - 503 Request Parameters @@ -494,6 +495,7 @@ Response Codes - 401 - 403 - 404 + - 409 - 503 Request Parameters diff --git a/doc/source/user/policy_types.rst b/doc/source/user/policy_types.rst index 9afa028a6..adfcc974b 100644 --- a/doc/source/user/policy_types.rst +++ b/doc/source/user/policy_types.rst @@ -22,7 +22,7 @@ Concept ~~~~~~~ A :term:`Policy Type` is an abstract specification of the rules to be checked -and/or enforced when certain :term:`Action` is performed on a cluster that +and/or enforced when an :term:`Action` is performed on a cluster that contains nodes of certain :term:`Profile Type`. A registry of policy types is built in memory when the Senlin engine diff --git a/doc/source/user/policy_types/scaling.rst b/doc/source/user/policy_types/scaling.rst index 45da4cec0..5c3d09c9e 100644 --- a/doc/source/user/policy_types/scaling.rst +++ b/doc/source/user/policy_types/scaling.rst @@ -49,14 +49,16 @@ scaling behavior in both directions using the same policy. Senlin has carefully designed the builtin policy types so that for scaling policies, you can attach more than one instance of the same policy type but -you may get an error when you are attempting to attach two policies of anther +you may get an error when you are attempting to attach two policies of another type (say ``senlin.policy.deletion``) to the same cluster. The value of the ``event`` property indicates when the policy will be checked. A policy with ``event`` set to "``CLUSTER_SCALE_IN``" will be checked when and only when a corresponding action is triggered on the cluster. A policy with ``event`` set to "``CLUSTER_SCALE_OUT``" will be checked when and only when -a corresponding action is triggered. +a corresponding action is triggered. If the cluster is currently processing a +scaling action it will not accept another scaling action until the current +action has been processed and cooldown has been observed. For both types of actions that can triggered the scaling policy, there are always three types of adjustments to choose from as listed below. The type diff --git a/releasenotes/notes/cluster-scale-action-conflict-0e1e64591e943e25.yaml b/releasenotes/notes/cluster-scale-action-conflict-0e1e64591e943e25.yaml new file mode 100644 index 000000000..d79a9803b --- /dev/null +++ b/releasenotes/notes/cluster-scale-action-conflict-0e1e64591e943e25.yaml @@ -0,0 +1,21 @@ +--- +prelude: > + This release alters the cluster_scale_in and cluster_scale_out actions to + no longer place the action into the actions table when a conflict is + detected. This behavior is an improvement on the old way actions are + processed as the requester will now receive immediate feedback from the + API when an action cannot be processed. This release also honors the + scaling action cooldown in the same manner by erring via the API when a + scaling action cannot be processed due to cooldown. +features: + - | + Scaling actions (IN or OUT) now validate that there is no conflicting + action already being processed and will return an error via the API + informing the end user if a conflict is detected. A conflicting action is + detected when new action of either `CLUSTER_SCALE_IN` or + `CLUSTER_SCALE_OUT` is attempted while there is already cluster scaling + action in the action table in a pending status (READY, RUNNING, WAITING, + ACTION_WAITING_LIFECYCLE_COMPLETION). + Additinally the cooldown will be checked and enforced when a scaling + action is requested. If the cooldown is being observed the requester will + be informed of this when submitting the action via an error. diff --git a/senlin/api/middleware/fault.py b/senlin/api/middleware/fault.py index 3886ba598..c2f3d7891 100644 --- a/senlin/api/middleware/fault.py +++ b/senlin/api/middleware/fault.py @@ -42,6 +42,8 @@ class FaultWrapper(wsgi.Middleware): """Replace error body with something the client can parse.""" error_map = { + 'ActionConflict': webob.exc.HTTPConflict, + 'ActionCooldown': webob.exc.HTTPConflict, 'ActionInProgress': webob.exc.HTTPConflict, 'BadRequest': webob.exc.HTTPBadRequest, 'FeatureNotSupported': webob.exc.HTTPConflict, @@ -57,7 +59,6 @@ class FaultWrapper(wsgi.Middleware): 'RequestLimitExceeded': webob.exc.HTTPBadRequest, 'ResourceInUse': webob.exc.HTTPConflict, 'ResourceIsLocked': webob.exc.HTTPConflict, - 'ActionConflict': webob.exc.HTTPConflict, 'ResourceNotFound': webob.exc.HTTPNotFound, } diff --git a/senlin/api/openstack/history.rst b/senlin/api/openstack/history.rst index 76c320ce1..eebef466f 100755 --- a/senlin/api/openstack/history.rst +++ b/senlin/api/openstack/history.rst @@ -111,3 +111,9 @@ it can be used by both users and developers. are now sent directly in the query body rather than in the params field. +1.11 +---- +- Modified the ``cluster_action`` API. The API now responds with + response code 409 when a scaling action conflicts with one already + being processed or a cooldown for a scaling action is encountered. + diff --git a/senlin/api/openstack/v1/version.py b/senlin/api/openstack/v1/version.py index 4d0fabf3a..204778282 100755 --- a/senlin/api/openstack/v1/version.py +++ b/senlin/api/openstack/v1/version.py @@ -24,7 +24,7 @@ class VersionController(object): # This includes any semantic changes which may not affect the input or # output formats or even originate in the API code layer. _MIN_API_VERSION = "1.0" - _MAX_API_VERSION = "1.10" + _MAX_API_VERSION = "1.11" DEFAULT_API_VERSION = _MIN_API_VERSION diff --git a/senlin/common/consts.py b/senlin/common/consts.py index 783782131..3171ceedc 100755 --- a/senlin/common/consts.py +++ b/senlin/common/consts.py @@ -64,6 +64,8 @@ CLUSTER_ACTION_NAMES = ( 'CLUSTER_OPERATION', ) +CLUSTER_SCALE_ACTIONS = [CLUSTER_SCALE_IN, CLUSTER_SCALE_OUT] + NODE_ACTION_NAMES = ( NODE_CREATE, NODE_DELETE, NODE_UPDATE, NODE_JOIN, NODE_LEAVE, diff --git a/senlin/common/exception.py b/senlin/common/exception.py index be8a480c4..fadaf685d 100644 --- a/senlin/common/exception.py +++ b/senlin/common/exception.py @@ -194,6 +194,12 @@ class ActionConflict(SenlinException): "the following action(s): %(actions)s") +class ActionCooldown(SenlinException): + msg_fmt = _("The %(type)s action for cluster %(cluster)s cannot be " + "processed due to Policy %(policy_id)s cooldown still in " + "progress") + + class NodeNotOrphan(SenlinException): msg_fmt = _("%(message)s") @@ -202,8 +208,8 @@ class InternalError(SenlinException): """A base class for internal exceptions in senlin. The internal exception classes which inherit from :class:`SenlinException` - class should be translated to a user facing exception type if need to be - made user visible. + class should be translated to a user facing exception type if they need to + be made user visible. """ msg_fmt = _("%(message)s") message = _('Internal error happened') diff --git a/senlin/db/api.py b/senlin/db/api.py index 592a598e3..a10595e6c 100755 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -329,6 +329,11 @@ def action_get(context, action_id, project_safe=True, refresh=False): refresh=refresh) +def action_list_active_scaling(context, cluster_id, project_safe=True): + return IMPL.action_list_active_scaling(context, cluster_id, + project_safe=project_safe) + + def action_get_by_name(context, name, project_safe=True): return IMPL.action_get_by_name(context, name, project_safe=project_safe) diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index d7fba5936..911b59d28 100755 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -1067,6 +1067,25 @@ def action_get(context, action_id, project_safe=True, refresh=False): return action +def action_list_active_scaling(context, cluster_id=None, project_safe=True): + with session_for_read() as session: + query = session.query(models.Action) + if project_safe: + query = query.filter_by(project=context.project_id) + if cluster_id: + query = query.filter_by(target=cluster_id) + query = query.filter( + models.Action.status.in_( + [consts.ACTION_READY, + consts.ACTION_WAITING, + consts.ACTION_RUNNING, + consts.ACTION_WAITING_LIFECYCLE_COMPLETION])) + query = query.filter( + models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS)) + scaling_actions = query.all() + return scaling_actions + + def action_get_by_name(context, name, project_safe=True): return query_by_name(context, models.Action, name, project_safe=project_safe) diff --git a/senlin/engine/actions/base.py b/senlin/engine/actions/base.py index fa95b6361..428658270 100755 --- a/senlin/engine/actions/base.py +++ b/senlin/engine/actions/base.py @@ -274,6 +274,9 @@ class Action(object): } c = req_context.RequestContext.from_dict(params) + if action in consts.CLUSTER_SCALE_ACTIONS: + Action.validate_scaling_action(c, target, action) + obj = cls(target, action, c, **kwargs) return obj.store(ctx) @@ -466,6 +469,70 @@ class Action(object): return return + @staticmethod + def validate_scaling_action(ctx, cluster_id, action): + """Validate scaling action against actions table and policy cooldown. + + :param ctx: An instance of the request context. + :param cluster_id: ID of the cluster the scaling action is targeting. + :param action: Scaling action being validated. + :return: None + :raises: An exception of ``ActionCooldown`` when the action being + validated is still in cooldown based off the policy or + ``ActionConflict`` when a scaling action is already in the action + table. + """ + # Check for conflicting actions in the actions table. + conflicting_actions = Action._get_conflicting_scaling_actions( + ctx, cluster_id) + if conflicting_actions: + action_ids = [a.get('id', None) for a in conflicting_actions] + LOG.info("Unable to process %(action)s for cluster %(cluster_id)s " + "the action conflicts with %(conflicts)s", + {'action': action, + 'cluster_id': cluster_id, + 'conflicts': action_ids}) + raise exception.ActionConflict( + type=action, + target=cluster_id, + actions=",".join(action_ids)) + + # Check to see if action cooldown should be observed. + bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id, + sort='priority', + filters={'enabled': True}) + for pb in bindings: + policy = policy_mod.Policy.load(ctx, pb.policy_id) + if getattr(policy, 'cooldown', None) and policy.event == action: + if pb.last_op and not timeutils.is_older_than( + pb.last_op, policy.cooldown): + LOG.info("Unable to process %(action)s for cluster " + "%(cluster_id)s the actions policy %(policy)s " + "cooldown still in progress", + {'action': action, + 'cluster_id': cluster_id, + 'policy': pb.policy_id}) + raise exception.ActionCooldown( + type=action, + cluster=cluster_id, + policy_id=pb.policy_id) + return + + @staticmethod + def _get_conflicting_scaling_actions(ctx, cluster_id): + """Check actions table for conflicting scaling actions. + + :param ctx: An instance of the request context. + :param cluster_id: ID of the cluster the scaling action is targeting. + :return: A list of conflicting actions. + """ + scaling_actions = ao.Action.action_list_active_scaling( + ctx, cluster_id) + if scaling_actions: + return [a.to_dict() for a in scaling_actions] + else: + return None + def to_dict(self): if self.id: dep_on = dobj.Dependency.get_depended(self.context, self.id) diff --git a/senlin/engine/service.py b/senlin/engine/service.py index 33d17f050..ba1a313c1 100755 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -2511,7 +2511,7 @@ class EngineService(service.Service): receiver = receiver_obj.Receiver.find(ctx, identity) try: - cluster = co.Cluster.find(ctx, receiver.cluster_id) + db_cluster = co.Cluster.find(ctx, receiver.cluster_id) except (exception.ResourceNotFound, exception.MultipleChoices) as ex: msg = ex.enhance_msg('referenced', ex) raise exception.BadRequest(msg=msg) @@ -2527,7 +2527,7 @@ class EngineService(service.Service): 'inputs': data } - action_id = action_mod.Action.create(ctx, cluster.id, + action_id = action_mod.Action.create(ctx, db_cluster.id, receiver.action, **kwargs) dispatcher.start_action() LOG.info("Webhook %(w)s triggered with action queued: %(a)s.", diff --git a/senlin/objects/action.py b/senlin/objects/action.py index 64f9d7860..870a56ce2 100755 --- a/senlin/objects/action.py +++ b/senlin/objects/action.py @@ -95,6 +95,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat): obj = db_api.action_get_by_short_id(context, short_id, **kwargs) return cls._from_db_object(context, cls(), obj) + @classmethod + def action_list_active_scaling(cls, context, cluster_id, **kwargs): + objs = db_api.action_list_active_scaling(context, cluster_id, **kwargs) + return [cls._from_db_object(context, cls(), obj) for obj in objs] + @classmethod def get_all(cls, context, **kwargs): objs = db_api.action_get_all(context, **kwargs) diff --git a/senlin/tests/unit/engine/actions/test_action_base.py b/senlin/tests/unit/engine/actions/test_action_base.py index 213ceb9bc..bdf0ba1e5 100755 --- a/senlin/tests/unit/engine/actions/test_action_base.py +++ b/senlin/tests/unit/engine/actions/test_action_base.py @@ -94,6 +94,11 @@ class ActionBaseTest(base.SenlinTestCase): self.assertIsNone(obj.updated_at) self.assertEqual({}, obj.data) + def _create_cp_binding(self, cluster_id, policy_id): + return cpo.ClusterPolicy(cluster_id=cluster_id, policy_id=policy_id, + enabled=True, id=uuidutils.generate_uuid(), + last_op=None) + @mock.patch.object(cluster_mod.Cluster, 'load') def test_action_new_cluster(self, mock_load): fake_cluster = mock.Mock(timeout=cfg.CONF.default_action_timeout) @@ -337,6 +342,43 @@ class ActionBaseTest(base.SenlinTestCase): mock_store.assert_not_called() mock_active.assert_called_once_with(mock.ANY, OBJID) + @mock.patch.object(timeutils, 'is_older_than') + @mock.patch.object(cpo.ClusterPolicy, 'get_all') + @mock.patch.object(policy_mod.Policy, 'load') + @mock.patch.object(ab.Action, 'store') + def test_action_create_scaling_cooldown_in_progress(self, mock_store, + mock_load, + mock_load_all, + mock_time_util): + cluster_id = CLUSTER_ID + # Note: policy is mocked + policy_id = uuidutils.generate_uuid() + policy = mock.Mock(id=policy_id, + TARGET=[('AFTER', 'CLUSTER_SCALE_OUT')], + event='CLUSTER_SCALE_OUT', + cooldown=240) + pb = self._create_cp_binding(cluster_id, policy.id) + pb.last_op = timeutils.utcnow(True) + mock_load_all.return_value = [pb] + mock_load.return_value = policy + mock_time_util.return_value = False + self.assertRaises(exception.ActionCooldown, ab.Action.create, self.ctx, + cluster_id, 'CLUSTER_SCALE_OUT') + self.assertEqual(0, mock_store.call_count) + + @mock.patch.object(ao.Action, 'action_list_active_scaling') + @mock.patch.object(ab.Action, 'store') + def test_action_create_scaling_conflict(self, mock_store, + mock_list_active): + cluster_id = CLUSTER_ID + + mock_action = mock.Mock() + mock_action.to_dict.return_value = {'id': 'fake_action_id'} + mock_list_active.return_value = [mock_action] + self.assertRaises(exception.ActionConflict, ab.Action.create, self.ctx, + cluster_id, 'CLUSTER_SCALE_IN') + self.assertEqual(0, mock_store.call_count) + def test_action_delete(self): result = ab.Action.delete(self.ctx, 'non-existent') self.assertIsNone(result)