diff --git a/karbor/api/v1/scheduled_operations.py b/karbor/api/v1/scheduled_operations.py index dbf2744b..fd3306da 100644 --- a/karbor/api/v1/scheduled_operations.py +++ b/karbor/api/v1/scheduled_operations.py @@ -248,6 +248,7 @@ class ScheduledOperationController(wsgi.Controller): context, operation_id, trigger_id) except (exception.InvalidInput, + exception.ScheduledOperationExist, exception.TriggerIsInvalid) as ex: raise exc.HTTPBadRequest(explanation=ex.msg) diff --git a/karbor/exception.py b/karbor/exception.py index c167d684..4e105ee5 100644 --- a/karbor/exception.py +++ b/karbor/exception.py @@ -162,6 +162,10 @@ class InvalidInput(Invalid): message = _("Invalid input received: %(reason)s") +class ScheduledOperationExist(Invalid): + message = _("Scheduled Operation%(op_id)s exists") + + class NotFound(KarborException): message = _("Resource could not be found.") code = 404 diff --git a/karbor/services/operationengine/api.py b/karbor/services/operationengine/api.py index 37860fed..1267059c 100644 --- a/karbor/services/operationengine/api.py +++ b/karbor/services/operationengine/api.py @@ -41,6 +41,14 @@ class API(base.Base): self.operationengine_rpcapi.delete_scheduled_operation( context, operation_id, trigger_id) + def suspend_scheduled_operation(self, context, operation_id, trigger_id): + self.operationengine_rpcapi.suspend_scheduled_operation( + context, operation_id, trigger_id) + + def resume_scheduled_operation(self, context, operation_id, trigger_id): + self.operationengine_rpcapi.resume_scheduled_operation( + context, operation_id, trigger_id) + def create_trigger(self, context, trigger): self.operationengine_rpcapi.create_trigger(context, trigger) diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py index 4fbb25b0..618ab5e1 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py @@ -103,7 +103,7 @@ class TimeTrigger(triggers.BaseTrigger): def register_operation(self, operation_id, **kwargs): if operation_id in self._operation_ids: msg = (_("The operation_id(%s) is exist") % operation_id) - raise exception.InvalidInput(msg) + raise exception.ScheduledOperationExist(msg) if self._greenthread and not self._greenthread.running: raise exception.TriggerIsInvalid(trigger_id=self._id) diff --git a/karbor/services/operationengine/manager.py b/karbor/services/operationengine/manager.py index 48903ba6..28c47eae 100644 --- a/karbor/services/operationengine/manager.py +++ b/karbor/services/operationengine/manager.py @@ -95,8 +95,11 @@ class OperationEngineManager(manager.Manager): break for state in states: - resume = (state.state in resume_states) operation = state.operation + if not operation.enabled: + continue + + resume = (state.state in resume_states) self._trigger_manager.register_operation( operation.trigger_id, operation.id, resume=resume, end_time_for_run=state.end_time_for_run) @@ -111,7 +114,8 @@ class OperationEngineManager(manager.Manager): @messaging.expected_exceptions(exception.TriggerNotFound, exception.InvalidInput, exception.TriggerIsInvalid, - exception.AuthorizationFailure) + exception.AuthorizationFailure, + exception.ScheduledOperationExist) def create_scheduled_operation(self, context, operation_id, trigger_id): LOG.debug("Create scheduled operation.") @@ -150,6 +154,24 @@ class OperationEngineManager(manager.Manager): self._trigger_manager.unregister_operation(trigger_id, operation_id) self._user_trust_manager.delete_operation(context, operation_id) + @messaging.expected_exceptions(exception.TriggerNotFound) + def suspend_scheduled_operation(self, context, operation_id, trigger_id): + LOG.debug("Suspend scheduled operation.") + self._trigger_manager.unregister_operation(trigger_id, operation_id) + + @messaging.expected_exceptions(exception.TriggerNotFound, + exception.TriggerIsInvalid) + def resume_scheduled_operation(self, context, operation_id, trigger_id): + LOG.debug("Resume scheduled operation.") + + try: + self._trigger_manager.register_operation( + trigger_id, operation_id) + except exception.ScheduledOperationExist: + pass + except Exception: + raise + @messaging.expected_exceptions(exception.InvalidInput) def create_trigger(self, context, trigger): self._trigger_manager.add_trigger(trigger.id, trigger.type, diff --git a/karbor/services/operationengine/rpcapi.py b/karbor/services/operationengine/rpcapi.py index d67a23f6..be5676fd 100644 --- a/karbor/services/operationengine/rpcapi.py +++ b/karbor/services/operationengine/rpcapi.py @@ -40,27 +40,35 @@ class OperationEngineAPI(object): version=self.RPC_API_VERSION) serializer = objects_base.KarborObjectSerializer() - self.client = rpc.get_client(target, version_cap=None, - serializer=serializer) + client = rpc.get_client(target, version_cap=None, + serializer=serializer) + self._client = client.prepare(version='1.0') def create_scheduled_operation(self, ctxt, operation_id, trigger_id): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(ctxt, 'create_scheduled_operation', - operation_id=operation_id, trigger_id=trigger_id) + return self._client.call(ctxt, 'create_scheduled_operation', + operation_id=operation_id, + trigger_id=trigger_id) def delete_scheduled_operation(self, ctxt, operation_id, trigger_id): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(ctxt, 'delete_scheduled_operation', - operation_id=operation_id, trigger_id=trigger_id) + return self._client.call(ctxt, 'delete_scheduled_operation', + operation_id=operation_id, + trigger_id=trigger_id) + + def suspend_scheduled_operation(self, ctxt, operation_id, trigger_id): + return self._client.call(ctxt, 'suspend_scheduled_operation', + operation_id=operation_id, + trigger_id=trigger_id) + + def resume_scheduled_operation(self, ctxt, operation_id, trigger_id): + return self._client.call(ctxt, 'resume_scheduled_operation', + operation_id=operation_id, + trigger_id=trigger_id) def create_trigger(self, ctxt, trigger): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(ctxt, 'create_trigger', trigger=trigger) + return self._client.call(ctxt, 'create_trigger', trigger=trigger) def delete_trigger(self, ctxt, trigger_id): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(ctxt, 'delete_trigger', trigger_id=trigger_id) + return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id) def update_trigger(self, ctxt, trigger): - cctxt = self.client.prepare(version='1.0') - return cctxt.call(ctxt, 'update_trigger', trigger=trigger) + return self._client.call(ctxt, 'update_trigger', trigger=trigger) diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py index 19bfd8eb..8bff10e2 100644 --- a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py @@ -94,7 +94,7 @@ class TimeTriggerTestCase(base.TestCase): operation_id = "1" self._register_operation(operation_id) - self.assertRaisesRegexp(exception.InvalidInput, + self.assertRaisesRegexp(exception.ScheduledOperationExist, "The operation_id.* is exist", self._trigger.register_operation, operation_id) diff --git a/karbor/tests/unit/operationengine/test_manager.py b/karbor/tests/unit/operationengine/test_manager.py index af61d809..0359bb21 100644 --- a/karbor/tests/unit/operationengine/test_manager.py +++ b/karbor/tests/unit/operationengine/test_manager.py @@ -10,10 +10,12 @@ # License for the specific language governing permissions and limitations # under the License. +import mock from oslo_messaging.rpc import dispatcher as rpc_dispatcher from karbor.common import constants from karbor import context +from karbor import exception from karbor import objects from karbor.services.operationengine import manager as service_manager from karbor.tests import base @@ -25,8 +27,13 @@ class FakeTriggerManager(object): self._trigger = {} def register_operation(self, trigger_id, operation_id, **kwargs): - if trigger_id in self._trigger: - self._trigger[trigger_id].append(operation_id) + if trigger_id not in self._trigger: + self._trigger[trigger_id] = [] + + if operation_id in self._trigger[trigger_id]: + raise exception.ScheduledOperationExist(op_id=operation_id) + + self._trigger[trigger_id].append(operation_id) def unregister_operation(self, trigger_id, operation_id, **kwargs): pass @@ -66,11 +73,16 @@ class OperationEngineManagerTestCase(base.TestCase): operation_id = self._operation.id self._create_operation_state(operation_id) + + op = self._create_scheduled_operation(self._trigger.id, False) + self._create_operation_state(op.id) + self.manager._restore() trigger_manager = self.manager._trigger_manager self.assertTrue(trigger_id in trigger_manager._trigger) self.assertTrue(operation_id in trigger_manager._trigger[trigger_id]) + self.assertFalse(op.id in trigger_manager._trigger[trigger_id]) def test_create_operation(self): operation_id = "1234" @@ -97,6 +109,23 @@ class OperationEngineManagerTestCase(base.TestCase): self.ctxt, self._operation.id) self.assertEqual(constants.OPERATION_STATE_DELETED, state.state) + @mock.patch.object(FakeTriggerManager, 'unregister_operation') + def test_suspend_resume_operation(self, unregister): + op_id = 'suspend' + trigger_id = "trigger" + + self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id) + self.assertTrue(op_id in ( + self.manager._trigger_manager._trigger[trigger_id])) + + self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id) + self.assertTrue(1 == len( + self.manager._trigger_manager._trigger[trigger_id])) + + # resume + self.manager.suspend_scheduled_operation(self.ctxt, op_id, trigger_id) + unregister.assert_called_once_with(trigger_id, op_id) + def _create_one_trigger(self): trigger_info = { 'project_id': "123", @@ -111,7 +140,7 @@ class OperationEngineManagerTestCase(base.TestCase): trigger.create() return trigger - def _create_scheduled_operation(self, trigger_id): + def _create_scheduled_operation(self, trigger_id, enabled=True): operation_info = { "name": "123", 'description': '123', @@ -122,6 +151,7 @@ class OperationEngineManagerTestCase(base.TestCase): "operation_definition": { "plan_id": "" }, + "enabled": enabled } operation = objects.ScheduledOperation(self.ctxt, **operation_info) operation.create()