Merge "Add checking audit state" into stable/ocata

This commit is contained in:
Jenkins 2017-02-16 13:53:58 +00:00 committed by Gerrit Code Review
commit 1caf89686c
3 changed files with 156 additions and 1 deletions

View File

@ -50,6 +50,21 @@ from watcher.decision_engine import rpcapi
from watcher import objects
ALLOWED_AUDIT_TRANSITIONS = {
objects.audit.State.PENDING:
[objects.audit.State.ONGOING, objects.audit.State.CANCELLED],
objects.audit.State.ONGOING:
[objects.audit.State.FAILED, objects.audit.State.SUCCEEDED,
objects.audit.State.CANCELLED],
objects.audit.State.FAILED:
[objects.audit.State.DELETED],
objects.audit.State.SUCCEEDED:
[objects.audit.State.DELETED],
objects.audit.State.CANCELLED:
[objects.audit.State.DELETED]
}
class AuditPostType(wtypes.Base):
audit_template_uuid = wtypes.wsattr(types.uuid, mandatory=False)
@ -561,6 +576,17 @@ class AuditsController(rest.RestController):
except api_utils.JSONPATCH_EXCEPTIONS as e:
raise exception.PatchError(patch=patch, reason=e)
initial_state = audit_dict['state']
new_state = api_utils.get_patch_value(patch, 'state')
allowed_states = ALLOWED_AUDIT_TRANSITIONS.get(initial_state, [])
if new_state is not None and new_state not in allowed_states:
error_message = _("State transition not allowed: "
"(%(initial_state)s -> %(new_state)s)")
raise exception.PatchError(
patch=patch,
reason=error_message % dict(
initial_state=initial_state, new_state=new_state))
# Update only the fields that have changed
for field in objects.Audit.fields:
try:

View File

@ -73,6 +73,12 @@ def apply_jsonpatch(doc, patch):
return jsonpatch.apply_patch(doc, jsonpatch.JsonPatch(patch))
def get_patch_value(patch, key):
for p in patch:
if p['op'] == 'replace' and p['path'] == '/%s' % key:
return p['value']
def as_filters_dict(**filters):
filters_dict = {}
for filter_name, filter_value in filters.items():

View File

@ -11,6 +11,7 @@
# limitations under the License.
import datetime
import itertools
import mock
from oslo_config import cfg
@ -267,7 +268,7 @@ class TestPatch(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = objects.audit.State.SUCCEEDED
new_state = objects.audit.State.CANCELLED
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(new_state, response['state'])
@ -343,6 +344,128 @@ class TestPatch(api_base.FunctionalTest):
self.assertTrue(response.json['error_message'])
ALLOWED_TRANSITIONS = [
{"original_state": objects.audit.State.PENDING,
"new_state": objects.audit.State.ONGOING},
{"original_state": objects.audit.State.PENDING,
"new_state": objects.audit.State.CANCELLED},
{"original_state": objects.audit.State.ONGOING,
"new_state": objects.audit.State.FAILED},
{"original_state": objects.audit.State.ONGOING,
"new_state": objects.audit.State.SUCCEEDED},
{"original_state": objects.audit.State.ONGOING,
"new_state": objects.audit.State.CANCELLED},
{"original_state": objects.audit.State.FAILED,
"new_state": objects.audit.State.DELETED},
{"original_state": objects.audit.State.SUCCEEDED,
"new_state": objects.audit.State.DELETED},
{"original_state": objects.audit.State.CANCELLED,
"new_state": objects.audit.State.DELETED},
]
class TestPatchStateTransitionDenied(api_base.FunctionalTest):
STATES = [
ap_state for ap_state in objects.audit.State.__dict__
if not ap_state.startswith("_")
]
scenarios = [
(
"%s -> %s" % (original_state, new_state),
{"original_state": original_state,
"new_state": new_state},
)
for original_state, new_state
in list(itertools.product(STATES, STATES))
if original_state != new_state
and {"original_state": original_state,
"new_state": new_state} not in ALLOWED_TRANSITIONS
]
def setUp(self):
super(TestPatchStateTransitionDenied, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context,
state=self.original_state)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
def test_replace_denied(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_code)
self.assertTrue(response.json['error_message'])
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(self.original_state, response['state'])
class TestPatchStateTransitionOk(api_base.FunctionalTest):
scenarios = [
(
"%s -> %s" % (transition["original_state"],
transition["new_state"]),
transition
)
for transition in ALLOWED_TRANSITIONS
]
def setUp(self):
super(TestPatchStateTransitionOk, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context,
state=self.original_state)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(self.new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
class TestPost(api_base.FunctionalTest):
def setUp(self):