Add suspend/resume operation interface in Operation Engine
suspend interface will pause the operation and it will not be triggered; resume interface will resume the operation to be triggered. Change-Id: I2d3fb84d53d5bad783bc4b1236091b7fca6727d8 Closes-Bug: #1606467
This commit is contained in:
parent
02bbf26751
commit
29fee84c52
|
@ -248,6 +248,7 @@ class ScheduledOperationController(wsgi.Controller):
|
|||
context, operation_id, trigger_id)
|
||||
|
||||
except (exception.InvalidInput,
|
||||
exception.ScheduledOperationExist,
|
||||
exception.TriggerIsInvalid) as ex:
|
||||
raise exc.HTTPBadRequest(explanation=ex.msg)
|
||||
|
||||
|
|
|
@ -162,6 +162,10 @@ class InvalidInput(Invalid):
|
|||
message = _("Invalid input received: %(reason)s")
|
||||
|
||||
|
||||
class ScheduledOperationExist(Invalid):
|
||||
message = _("Scheduled Operation%(op_id)s exists")
|
||||
|
||||
|
||||
class NotFound(KarborException):
|
||||
message = _("Resource could not be found.")
|
||||
code = 404
|
||||
|
|
|
@ -41,6 +41,14 @@ class API(base.Base):
|
|||
self.operationengine_rpcapi.delete_scheduled_operation(
|
||||
context, operation_id, trigger_id)
|
||||
|
||||
def suspend_scheduled_operation(self, context, operation_id, trigger_id):
|
||||
self.operationengine_rpcapi.suspend_scheduled_operation(
|
||||
context, operation_id, trigger_id)
|
||||
|
||||
def resume_scheduled_operation(self, context, operation_id, trigger_id):
|
||||
self.operationengine_rpcapi.resume_scheduled_operation(
|
||||
context, operation_id, trigger_id)
|
||||
|
||||
def create_trigger(self, context, trigger):
|
||||
self.operationengine_rpcapi.create_trigger(context, trigger)
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
def register_operation(self, operation_id, **kwargs):
|
||||
if operation_id in self._operation_ids:
|
||||
msg = (_("The operation_id(%s) is exist") % operation_id)
|
||||
raise exception.InvalidInput(msg)
|
||||
raise exception.ScheduledOperationExist(msg)
|
||||
|
||||
if self._greenthread and not self._greenthread.running:
|
||||
raise exception.TriggerIsInvalid(trigger_id=self._id)
|
||||
|
|
|
@ -95,8 +95,11 @@ class OperationEngineManager(manager.Manager):
|
|||
break
|
||||
|
||||
for state in states:
|
||||
resume = (state.state in resume_states)
|
||||
operation = state.operation
|
||||
if not operation.enabled:
|
||||
continue
|
||||
|
||||
resume = (state.state in resume_states)
|
||||
self._trigger_manager.register_operation(
|
||||
operation.trigger_id, operation.id,
|
||||
resume=resume, end_time_for_run=state.end_time_for_run)
|
||||
|
@ -111,7 +114,8 @@ class OperationEngineManager(manager.Manager):
|
|||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.InvalidInput,
|
||||
exception.TriggerIsInvalid,
|
||||
exception.AuthorizationFailure)
|
||||
exception.AuthorizationFailure,
|
||||
exception.ScheduledOperationExist)
|
||||
def create_scheduled_operation(self, context, operation_id, trigger_id):
|
||||
LOG.debug("Create scheduled operation.")
|
||||
|
||||
|
@ -150,6 +154,24 @@ class OperationEngineManager(manager.Manager):
|
|||
self._trigger_manager.unregister_operation(trigger_id, operation_id)
|
||||
self._user_trust_manager.delete_operation(context, operation_id)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound)
|
||||
def suspend_scheduled_operation(self, context, operation_id, trigger_id):
|
||||
LOG.debug("Suspend scheduled operation.")
|
||||
self._trigger_manager.unregister_operation(trigger_id, operation_id)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.TriggerIsInvalid)
|
||||
def resume_scheduled_operation(self, context, operation_id, trigger_id):
|
||||
LOG.debug("Resume scheduled operation.")
|
||||
|
||||
try:
|
||||
self._trigger_manager.register_operation(
|
||||
trigger_id, operation_id)
|
||||
except exception.ScheduledOperationExist:
|
||||
pass
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
@messaging.expected_exceptions(exception.InvalidInput)
|
||||
def create_trigger(self, context, trigger):
|
||||
self._trigger_manager.add_trigger(trigger.id, trigger.type,
|
||||
|
|
|
@ -40,27 +40,35 @@ class OperationEngineAPI(object):
|
|||
version=self.RPC_API_VERSION)
|
||||
serializer = objects_base.KarborObjectSerializer()
|
||||
|
||||
self.client = rpc.get_client(target, version_cap=None,
|
||||
serializer=serializer)
|
||||
client = rpc.get_client(target, version_cap=None,
|
||||
serializer=serializer)
|
||||
self._client = client.prepare(version='1.0')
|
||||
|
||||
def create_scheduled_operation(self, ctxt, operation_id, trigger_id):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(ctxt, 'create_scheduled_operation',
|
||||
operation_id=operation_id, trigger_id=trigger_id)
|
||||
return self._client.call(ctxt, 'create_scheduled_operation',
|
||||
operation_id=operation_id,
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def delete_scheduled_operation(self, ctxt, operation_id, trigger_id):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(ctxt, 'delete_scheduled_operation',
|
||||
operation_id=operation_id, trigger_id=trigger_id)
|
||||
return self._client.call(ctxt, 'delete_scheduled_operation',
|
||||
operation_id=operation_id,
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def suspend_scheduled_operation(self, ctxt, operation_id, trigger_id):
|
||||
return self._client.call(ctxt, 'suspend_scheduled_operation',
|
||||
operation_id=operation_id,
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def resume_scheduled_operation(self, ctxt, operation_id, trigger_id):
|
||||
return self._client.call(ctxt, 'resume_scheduled_operation',
|
||||
operation_id=operation_id,
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def create_trigger(self, ctxt, trigger):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(ctxt, 'create_trigger', trigger=trigger)
|
||||
return self._client.call(ctxt, 'create_trigger', trigger=trigger)
|
||||
|
||||
def delete_trigger(self, ctxt, trigger_id):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
|
||||
return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
|
||||
|
||||
def update_trigger(self, ctxt, trigger):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(ctxt, 'update_trigger', trigger=trigger)
|
||||
return self._client.call(ctxt, 'update_trigger', trigger=trigger)
|
||||
|
|
|
@ -94,7 +94,7 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
operation_id = "1"
|
||||
self._register_operation(operation_id)
|
||||
|
||||
self.assertRaisesRegexp(exception.InvalidInput,
|
||||
self.assertRaisesRegexp(exception.ScheduledOperationExist,
|
||||
"The operation_id.* is exist",
|
||||
self._trigger.register_operation,
|
||||
operation_id)
|
||||
|
|
|
@ -10,10 +10,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor import context
|
||||
from karbor import exception
|
||||
from karbor import objects
|
||||
from karbor.services.operationengine import manager as service_manager
|
||||
from karbor.tests import base
|
||||
|
@ -25,8 +27,13 @@ class FakeTriggerManager(object):
|
|||
self._trigger = {}
|
||||
|
||||
def register_operation(self, trigger_id, operation_id, **kwargs):
|
||||
if trigger_id in self._trigger:
|
||||
self._trigger[trigger_id].append(operation_id)
|
||||
if trigger_id not in self._trigger:
|
||||
self._trigger[trigger_id] = []
|
||||
|
||||
if operation_id in self._trigger[trigger_id]:
|
||||
raise exception.ScheduledOperationExist(op_id=operation_id)
|
||||
|
||||
self._trigger[trigger_id].append(operation_id)
|
||||
|
||||
def unregister_operation(self, trigger_id, operation_id, **kwargs):
|
||||
pass
|
||||
|
@ -66,11 +73,16 @@ class OperationEngineManagerTestCase(base.TestCase):
|
|||
operation_id = self._operation.id
|
||||
|
||||
self._create_operation_state(operation_id)
|
||||
|
||||
op = self._create_scheduled_operation(self._trigger.id, False)
|
||||
self._create_operation_state(op.id)
|
||||
|
||||
self.manager._restore()
|
||||
|
||||
trigger_manager = self.manager._trigger_manager
|
||||
self.assertTrue(trigger_id in trigger_manager._trigger)
|
||||
self.assertTrue(operation_id in trigger_manager._trigger[trigger_id])
|
||||
self.assertFalse(op.id in trigger_manager._trigger[trigger_id])
|
||||
|
||||
def test_create_operation(self):
|
||||
operation_id = "1234"
|
||||
|
@ -97,6 +109,23 @@ class OperationEngineManagerTestCase(base.TestCase):
|
|||
self.ctxt, self._operation.id)
|
||||
self.assertEqual(constants.OPERATION_STATE_DELETED, state.state)
|
||||
|
||||
@mock.patch.object(FakeTriggerManager, 'unregister_operation')
|
||||
def test_suspend_resume_operation(self, unregister):
|
||||
op_id = 'suspend'
|
||||
trigger_id = "trigger"
|
||||
|
||||
self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id)
|
||||
self.assertTrue(op_id in (
|
||||
self.manager._trigger_manager._trigger[trigger_id]))
|
||||
|
||||
self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id)
|
||||
self.assertTrue(1 == len(
|
||||
self.manager._trigger_manager._trigger[trigger_id]))
|
||||
|
||||
# resume
|
||||
self.manager.suspend_scheduled_operation(self.ctxt, op_id, trigger_id)
|
||||
unregister.assert_called_once_with(trigger_id, op_id)
|
||||
|
||||
def _create_one_trigger(self):
|
||||
trigger_info = {
|
||||
'project_id': "123",
|
||||
|
@ -111,7 +140,7 @@ class OperationEngineManagerTestCase(base.TestCase):
|
|||
trigger.create()
|
||||
return trigger
|
||||
|
||||
def _create_scheduled_operation(self, trigger_id):
|
||||
def _create_scheduled_operation(self, trigger_id, enabled=True):
|
||||
operation_info = {
|
||||
"name": "123",
|
||||
'description': '123',
|
||||
|
@ -122,6 +151,7 @@ class OperationEngineManagerTestCase(base.TestCase):
|
|||
"operation_definition": {
|
||||
"plan_id": ""
|
||||
},
|
||||
"enabled": enabled
|
||||
}
|
||||
operation = objects.ScheduledOperation(self.ctxt, **operation_info)
|
||||
operation.create()
|
||||
|
|
Loading…
Reference in New Issue