Add suspend/resume operation interface in Operation Engine

suspend interface will pause the operation and it will not be triggered;
resume interface will resume the operation to be triggered.

Change-Id: I2d3fb84d53d5bad783bc4b1236091b7fca6727d8
Closes-Bug: #1606467
This commit is contained in:
zengchen 2016-08-02 17:08:23 +08:00
parent 02bbf26751
commit 29fee84c52
8 changed files with 94 additions and 21 deletions

View File

@ -248,6 +248,7 @@ class ScheduledOperationController(wsgi.Controller):
context, operation_id, trigger_id)
except (exception.InvalidInput,
exception.ScheduledOperationExist,
exception.TriggerIsInvalid) as ex:
raise exc.HTTPBadRequest(explanation=ex.msg)

View File

@ -162,6 +162,10 @@ class InvalidInput(Invalid):
message = _("Invalid input received: %(reason)s")
class ScheduledOperationExist(Invalid):
message = _("Scheduled Operation%(op_id)s exists")
class NotFound(KarborException):
message = _("Resource could not be found.")
code = 404

View File

@ -41,6 +41,14 @@ class API(base.Base):
self.operationengine_rpcapi.delete_scheduled_operation(
context, operation_id, trigger_id)
def suspend_scheduled_operation(self, context, operation_id, trigger_id):
self.operationengine_rpcapi.suspend_scheduled_operation(
context, operation_id, trigger_id)
def resume_scheduled_operation(self, context, operation_id, trigger_id):
self.operationengine_rpcapi.resume_scheduled_operation(
context, operation_id, trigger_id)
def create_trigger(self, context, trigger):
self.operationengine_rpcapi.create_trigger(context, trigger)

View File

@ -103,7 +103,7 @@ class TimeTrigger(triggers.BaseTrigger):
def register_operation(self, operation_id, **kwargs):
if operation_id in self._operation_ids:
msg = (_("The operation_id(%s) is exist") % operation_id)
raise exception.InvalidInput(msg)
raise exception.ScheduledOperationExist(msg)
if self._greenthread and not self._greenthread.running:
raise exception.TriggerIsInvalid(trigger_id=self._id)

View File

@ -95,8 +95,11 @@ class OperationEngineManager(manager.Manager):
break
for state in states:
resume = (state.state in resume_states)
operation = state.operation
if not operation.enabled:
continue
resume = (state.state in resume_states)
self._trigger_manager.register_operation(
operation.trigger_id, operation.id,
resume=resume, end_time_for_run=state.end_time_for_run)
@ -111,7 +114,8 @@ class OperationEngineManager(manager.Manager):
@messaging.expected_exceptions(exception.TriggerNotFound,
exception.InvalidInput,
exception.TriggerIsInvalid,
exception.AuthorizationFailure)
exception.AuthorizationFailure,
exception.ScheduledOperationExist)
def create_scheduled_operation(self, context, operation_id, trigger_id):
LOG.debug("Create scheduled operation.")
@ -150,6 +154,24 @@ class OperationEngineManager(manager.Manager):
self._trigger_manager.unregister_operation(trigger_id, operation_id)
self._user_trust_manager.delete_operation(context, operation_id)
@messaging.expected_exceptions(exception.TriggerNotFound)
def suspend_scheduled_operation(self, context, operation_id, trigger_id):
LOG.debug("Suspend scheduled operation.")
self._trigger_manager.unregister_operation(trigger_id, operation_id)
@messaging.expected_exceptions(exception.TriggerNotFound,
exception.TriggerIsInvalid)
def resume_scheduled_operation(self, context, operation_id, trigger_id):
LOG.debug("Resume scheduled operation.")
try:
self._trigger_manager.register_operation(
trigger_id, operation_id)
except exception.ScheduledOperationExist:
pass
except Exception:
raise
@messaging.expected_exceptions(exception.InvalidInput)
def create_trigger(self, context, trigger):
self._trigger_manager.add_trigger(trigger.id, trigger.type,

View File

@ -40,27 +40,35 @@ class OperationEngineAPI(object):
version=self.RPC_API_VERSION)
serializer = objects_base.KarborObjectSerializer()
self.client = rpc.get_client(target, version_cap=None,
serializer=serializer)
client = rpc.get_client(target, version_cap=None,
serializer=serializer)
self._client = client.prepare(version='1.0')
def create_scheduled_operation(self, ctxt, operation_id, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'create_scheduled_operation',
operation_id=operation_id, trigger_id=trigger_id)
return self._client.call(ctxt, 'create_scheduled_operation',
operation_id=operation_id,
trigger_id=trigger_id)
def delete_scheduled_operation(self, ctxt, operation_id, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'delete_scheduled_operation',
operation_id=operation_id, trigger_id=trigger_id)
return self._client.call(ctxt, 'delete_scheduled_operation',
operation_id=operation_id,
trigger_id=trigger_id)
def suspend_scheduled_operation(self, ctxt, operation_id, trigger_id):
return self._client.call(ctxt, 'suspend_scheduled_operation',
operation_id=operation_id,
trigger_id=trigger_id)
def resume_scheduled_operation(self, ctxt, operation_id, trigger_id):
return self._client.call(ctxt, 'resume_scheduled_operation',
operation_id=operation_id,
trigger_id=trigger_id)
def create_trigger(self, ctxt, trigger):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'create_trigger', trigger=trigger)
return self._client.call(ctxt, 'create_trigger', trigger=trigger)
def delete_trigger(self, ctxt, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
def update_trigger(self, ctxt, trigger):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'update_trigger', trigger=trigger)
return self._client.call(ctxt, 'update_trigger', trigger=trigger)

View File

@ -94,7 +94,7 @@ class TimeTriggerTestCase(base.TestCase):
operation_id = "1"
self._register_operation(operation_id)
self.assertRaisesRegexp(exception.InvalidInput,
self.assertRaisesRegexp(exception.ScheduledOperationExist,
"The operation_id.* is exist",
self._trigger.register_operation,
operation_id)

View File

@ -10,10 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from karbor.common import constants
from karbor import context
from karbor import exception
from karbor import objects
from karbor.services.operationengine import manager as service_manager
from karbor.tests import base
@ -25,8 +27,13 @@ class FakeTriggerManager(object):
self._trigger = {}
def register_operation(self, trigger_id, operation_id, **kwargs):
if trigger_id in self._trigger:
self._trigger[trigger_id].append(operation_id)
if trigger_id not in self._trigger:
self._trigger[trigger_id] = []
if operation_id in self._trigger[trigger_id]:
raise exception.ScheduledOperationExist(op_id=operation_id)
self._trigger[trigger_id].append(operation_id)
def unregister_operation(self, trigger_id, operation_id, **kwargs):
pass
@ -66,11 +73,16 @@ class OperationEngineManagerTestCase(base.TestCase):
operation_id = self._operation.id
self._create_operation_state(operation_id)
op = self._create_scheduled_operation(self._trigger.id, False)
self._create_operation_state(op.id)
self.manager._restore()
trigger_manager = self.manager._trigger_manager
self.assertTrue(trigger_id in trigger_manager._trigger)
self.assertTrue(operation_id in trigger_manager._trigger[trigger_id])
self.assertFalse(op.id in trigger_manager._trigger[trigger_id])
def test_create_operation(self):
operation_id = "1234"
@ -97,6 +109,23 @@ class OperationEngineManagerTestCase(base.TestCase):
self.ctxt, self._operation.id)
self.assertEqual(constants.OPERATION_STATE_DELETED, state.state)
@mock.patch.object(FakeTriggerManager, 'unregister_operation')
def test_suspend_resume_operation(self, unregister):
op_id = 'suspend'
trigger_id = "trigger"
self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id)
self.assertTrue(op_id in (
self.manager._trigger_manager._trigger[trigger_id]))
self.manager.resume_scheduled_operation(self.ctxt, op_id, trigger_id)
self.assertTrue(1 == len(
self.manager._trigger_manager._trigger[trigger_id]))
# resume
self.manager.suspend_scheduled_operation(self.ctxt, op_id, trigger_id)
unregister.assert_called_once_with(trigger_id, op_id)
def _create_one_trigger(self):
trigger_info = {
'project_id': "123",
@ -111,7 +140,7 @@ class OperationEngineManagerTestCase(base.TestCase):
trigger.create()
return trigger
def _create_scheduled_operation(self, trigger_id):
def _create_scheduled_operation(self, trigger_id, enabled=True):
operation_info = {
"name": "123",
'description': '123',
@ -122,6 +151,7 @@ class OperationEngineManagerTestCase(base.TestCase):
"operation_definition": {
"plan_id": ""
},
"enabled": enabled
}
operation = objects.ScheduledOperation(self.ctxt, **operation_info)
operation.create()