# Copyright 2013 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import datetime import hashlib from unittest import mock import json import sqlalchemy as sa import yaml from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral import exceptions as exc from mistral.tests.unit.api import base from mistral.tests.unit import base as unit_base from mistral.utils import safe_yaml from mistral_lib import utils WF_DEFINITION = """ --- version: '2.0' flow: type: direct input: - param1 tasks: task1: action: std.echo output="Hi" """ WF_DEFINITION_UPDATED = """ --- version: '2.0' flow: type: direct input: - param1 - param2 tasks: task1: action: std.echo output="Hi" """ WF_DB = models.WorkflowDefinition( id='123e4567-e89b-12d3-a456-426655440000', name='flow', definition=WF_DEFINITION, created_at=datetime.datetime(1970, 1, 1), updated_at=datetime.datetime(1970, 1, 1), spec={'input': ['param1']} ) WF_DB_SYSTEM = WF_DB.get_clone() WF_DB_SYSTEM.is_system = True WF = { 'id': '123e4567-e89b-12d3-a456-426655440000', 'name': 'flow', 'definition': WF_DEFINITION, 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', 'input': 'param1', 'interface': {"output": [], "input": ["param1"]} } WF_DB_WITHIN_ABC_NAMESPACE = models.WorkflowDefinition( id='234560fe-162a-4060-a16a-a0d9eee9b408', name='flow', namespace='abc', definition=WF_DEFINITION, created_at=datetime.datetime(1970, 1, 1), updated_at=datetime.datetime(1970, 1, 1), spec={'input': ['param1']} ) WF_WITH_NAMESPACE = { 'id': '234560fe-162a-4060-a16a-a0d9eee9b408', 'name': 'flow', 'namespace': 'abc', 'definition': WF_DEFINITION, 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', 'input': 'param1', 'interface': {'input': ['param1'], 'output': []} } WF_DEFINITION_WITH_INPUT = """ --- version: '2.0' flow: type: direct input: - param1 - param2: 2 tasks: task1: action: std.echo output="Hi" """ WF_DB_WITH_INPUT = models.WorkflowDefinition( name='flow', definition=WF_DEFINITION_WITH_INPUT, created_at=datetime.datetime(1970, 1, 1), updated_at=datetime.datetime(1970, 1, 1), spec={'input': ['param1', {'param2': 2}]} ) WF_WITH_DEFAULT_INPUT = { 'name': 'flow', 'definition': WF_DEFINITION_WITH_INPUT, 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', 'input': 'param1, param2="2"', 'interface': { "input": ["param1", {"param2": 2}], "output": [] } } WF_DB_PROJECT_ID = WF_DB.get_clone() WF_DB_PROJECT_ID.project_id = '' UPDATED_WF_DEFINITION = """ --- version: '2.0' flow: type: direct input: - param1 - param2 tasks: task1: action: std.echo output="Hi" """ UPDATED_WF_DB = copy.copy(WF_DB) UPDATED_WF_DB['definition'] = UPDATED_WF_DEFINITION UPDATED_WF = copy.deepcopy(WF) UPDATED_WF['definition'] = UPDATED_WF_DEFINITION WF_DEF_INVALID_MODEL_EXCEPTION = """ --- version: '2.0' flow: type: direct tasks: task1: action: std.echo output="Hi" workflow: wf1 """ WF_DEF_DSL_PARSE_EXCEPTION = """ --- % """ WF_DEF_YAQL_PARSE_EXCEPTION = """ --- version: '2.0' flow: type: direct tasks: task1: action: std.echo output=<% * %> """ WFS_DEFINITION = """ --- version: '2.0' wf1: tasks: task1: action: std.echo output="Hello" wf2: tasks: task1: action: std.echo output="Mistral" """ WFS_YAML = safe_yaml.safe_load(WFS_DEFINITION) FIRST_WF_DEF = yaml.dump({ 'version': '2.0', 'wf1': WFS_YAML['wf1'] }) SECOND_WF_DEF = yaml.dump({ 'version': '2.0', 'wf2': WFS_YAML['wf2'] }) FIRST_WF_DICT = { 'name': 'wf1', 'tasks': { 'task1': { 'action': 'std.echo output="Hello"', 'name': 'task1', 'type': 'direct', 'version': '2.0' } }, 'version': '2.0' } FIRST_WF = { 'name': 'wf1', 'tags': [], 'definition': FIRST_WF_DEF, 'spec': FIRST_WF_DICT, 'scope': 'private', 'namespace': '', 'checksum': '1b786ecb96b9358f67718a407c274885', 'is_system': False } SECOND_WF_DICT = { 'name': 'wf2', 'tasks': { 'task1': { 'action': 'std.echo output="Mistral"', 'name': 'task1', 'type': 'direct', 'version': '2.0' } }, 'version': '2.0' } SECOND_WF = { 'name': 'wf2', 'tags': [], 'definition': SECOND_WF_DEF, 'spec': SECOND_WF_DICT, 'scope': 'private', 'namespace': '', 'checksum': '5803661ccfdf226c95254b2a8a295226', 'is_system': False } MOCK_WF = mock.MagicMock(return_value=WF_DB) MOCK_WF_SYSTEM = mock.MagicMock(return_value=WF_DB_SYSTEM) MOCK_WF_WITH_INPUT = mock.MagicMock(return_value=WF_DB_WITH_INPUT) MOCK_WFS = mock.MagicMock(return_value=[WF_DB]) MOCK_UPDATED_WF = mock.MagicMock(return_value=UPDATED_WF_DB) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) class TestWorkflowsController(base.APITest): @mock.patch.object(db_api, "get_workflow_definition", MOCK_WF) def test_get(self): resp = self.app.get('/v2/workflows/123') resp_json = resp.json resp_json['interface'] = json.loads(resp_json['interface']) self.assertEqual(200, resp.status_int) self.assertDictEqual(WF, resp_json) @mock.patch('mistral.db.v2.api.get_workflow_definition') def test_get_with_fields_filter(self, mocked_get): mocked_get.return_value = (WF['id'], WF['name'],) resp = self.app.get('/v2/workflows/123?fields=name') expected = { 'id': WF['id'], 'name': WF['name'], } self.assertEqual(200, resp.status_int) self.assertDictEqual(expected, resp.json) @mock.patch.object(db_api, 'get_workflow_definition') def test_get_operational_error(self, mocked_get): mocked_get.side_effect = [ # Emulating DB OperationalError sa.exc.OperationalError('Mock', 'mock', 'mock'), WF_DB # Successful run ] resp = self.app.get('/v2/workflows/123') resp_json = resp.json resp_json['interface'] = json.loads(resp_json['interface']) self.assertEqual(200, resp.status_int) self.assertDictEqual(WF, resp_json) @mock.patch.object(db_api, "get_workflow_definition", MOCK_WF_WITH_INPUT) def test_get_with_input(self): resp = self.app.get('/v2/workflows/123') self.maxDiff = None resp_json = resp.json resp_json['interface'] = json.loads(resp_json['interface']) self.assertEqual(200, resp.status_int) self.assertDictEqual(WF_WITH_DEFAULT_INPUT, resp_json) @mock.patch.object(db_api, "get_workflow_definition", MOCK_NOT_FOUND) def test_get_not_found(self): resp = self.app.get('/v2/workflows/123', expect_errors=True) self.assertEqual(404, resp.status_int) @mock.patch.object( db_api, "update_workflow_definition", MOCK_UPDATED_WF ) def test_put(self): resp = self.app.put( '/v2/workflows', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.maxDiff = None self.assertEqual(200, resp.status_int) self.assertDictEqual({'workflows': [UPDATED_WF]}, resp.json) @mock.patch("mistral.services.workflows.update_workflows") def test_put_with_uuid(self, update_mock): update_mock.return_value = [UPDATED_WF_DB] resp = self.app.put( '/v2/workflows/123e4567-e89b-12d3-a456-426655440000', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(200, resp.status_int) update_mock.assert_called_once_with( UPDATED_WF_DEFINITION, scope='private', identifier='123e4567-e89b-12d3-a456-426655440000', namespace='', validate=True ) self.assertDictEqual(UPDATED_WF, resp.json) @mock.patch( "mistral.db.v2.sqlalchemy.api.get_workflow_definition", return_value=WF_DB_SYSTEM ) def test_put_system(self, get_mock): resp = self.app.put( '/v2/workflows', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn( "Can not modify a system", resp.body.decode() ) @mock.patch.object(db_api, "update_workflow_definition") def test_put_public(self, mock_update): mock_update.return_value = UPDATED_WF_DB resp = self.app.put( '/v2/workflows?scope=public', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(200, resp.status_int) self.assertDictEqual({'workflows': [UPDATED_WF]}, resp.json) self.assertEqual("public", mock_update.call_args[0][1]['scope']) def test_put_wrong_scope(self): resp = self.app.put( '/v2/workflows?scope=unique', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Scope must be one of the following", resp.body.decode()) @mock.patch.object( db_api, "update_workflow_definition", MOCK_WF_WITH_INPUT ) def test_put_with_input(self): resp = self.app.put( '/v2/workflows', WF_DEFINITION_WITH_INPUT, headers={'Content-Type': 'text/plain'} ) self.maxDiff = None self.assertEqual(200, resp.status_int) self.assertDictEqual({'workflows': [WF_WITH_DEFAULT_INPUT]}, resp.json) @mock.patch.object( db_api, "update_workflow_definition", MOCK_NOT_FOUND ) def test_put_not_found(self): resp = self.app.put( '/v2/workflows', UPDATED_WF_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True, ) self.assertEqual(404, resp.status_int) def test_put_invalid(self): resp = self.app.put( '/v2/workflows', WF_DEF_INVALID_MODEL_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Invalid DSL", resp.body.decode()) @mock.patch.object( db_api, "update_workflow_definition", MOCK_UPDATED_WF ) def test_put_invalid_skip_validation(self): self.override_config('validation_mode', 'enabled', 'api') resp = self.app.put( '/v2/workflows?skip_validation', WF_DEF_INVALID_MODEL_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(200, resp.status_int) @mock.patch.object(db_api, "update_workflow_definition") def test_put_multiple(self, mock_mtd): spec_mock = mock_mtd.return_value.get.return_value spec_mock.get.return_value = {} self.app.put( '/v2/workflows', WFS_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(2, mock_mtd.call_count) mock_mtd.assert_any_call('wf1', FIRST_WF) mock_mtd.assert_any_call('wf2', SECOND_WF) def test_put_more_workflows_with_uuid(self): resp = self.app.put( '/v2/workflows/123e4567-e89b-12d3-a456-426655440000', WFS_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn( "More than one workflows are not supported for update", resp.body.decode() ) @mock.patch.object(db_api, "create_workflow_definition") def test_post(self, mock_mtd): mock_mtd.return_value = WF_DB resp = self.app.post( '/v2/workflows', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(201, resp.status_int) self.assertDictEqual({'workflows': [WF]}, resp.json) self.assertEqual(1, mock_mtd.call_count) spec = mock_mtd.call_args[0][0]['spec'] self.assertIsNotNone(spec) self.assertEqual(WF_DB.name, spec['name']) @mock.patch.object(db_api, "create_workflow_definition") def test_post_public(self, mock_mtd): mock_mtd.return_value = WF_DB resp = self.app.post( '/v2/workflows?scope=public', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(201, resp.status_int) self.assertEqual({"workflows": [WF]}, resp.json) self.assertEqual("public", mock_mtd.call_args[0][0]['scope']) def test_post_wrong_scope(self): resp = self.app.post( '/v2/workflows?scope=unique', WF_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Scope must be one of the following", resp.body.decode()) @mock.patch.object(db_api, "create_workflow_definition", MOCK_DUPLICATE) def test_post_dup(self): resp = self.app.post( '/v2/workflows', WF_DEFINITION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(409, resp.status_int) @mock.patch.object(db_api, "create_workflow_definition") def test_post_multiple(self, mock_mtd): spec_mock = mock_mtd.return_value.get.return_value spec_mock.get.return_value = {} self.app.post( '/v2/workflows', WFS_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(2, mock_mtd.call_count) mock_mtd.assert_any_call(FIRST_WF) mock_mtd.assert_any_call(SECOND_WF) def test_post_invalid(self): resp = self.app.post( '/v2/workflows', WF_DEF_INVALID_MODEL_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Invalid DSL", resp.body.decode()) def test_post_invalid_skip_validation(self): self.override_config('validation_mode', 'enabled', 'api') resp = self.app.post( '/v2/workflows?skip_validation', WF_DEF_INVALID_MODEL_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(201, resp.status_int) @mock.patch.object(db_api, "delete_workflow_definition", MOCK_DELETE) @mock.patch.object(db_api, "get_workflow_definition", MOCK_WF) def test_delete(self): resp = self.app.delete('/v2/workflows/123') self.assertEqual(204, resp.status_int) @mock.patch( "mistral.db.v2.sqlalchemy.api.get_workflow_definition", return_value=WF_DB_SYSTEM ) def test_delete_system(self, get_mock): resp = self.app.delete('/v2/workflows/123', expect_errors=True) self.assertEqual(400, resp.status_int) self.assertIn( "Can not modify a system", resp.body.decode() ) @mock.patch.object(db_api, "delete_workflow_definition", MOCK_NOT_FOUND) def test_delete_not_found(self): resp = self.app.delete('/v2/workflows/123', expect_errors=True) self.assertEqual(404, resp.status_int) @mock.patch.object(db_api, "get_workflow_definitions", MOCK_WFS) def test_get_all(self): resp = self.app.get('/v2/workflows') self.assertEqual(200, resp.status_int) resp_json = resp.json['workflows'][0] resp_json['interface'] = json.loads(resp_json['interface']) self.assertEqual(1, len(resp.json['workflows'])) self.assertDictEqual(WF, resp_json) @mock.patch.object(db_api, 'get_workflow_definitions') def test_get_all_operational_error(self, mocked_get_all): mocked_get_all.side_effect = [ # Emulating DB OperationalError sa.exc.OperationalError('Mock', 'mock', 'mock'), [WF_DB] # Successful run ] resp = self.app.get('/v2/workflows') resp_workflow_json = resp.json['workflows'][0] resp_workflow_json['interface'] = \ json.loads(resp_workflow_json['interface']) self.assertEqual(200, resp.status_int) self.assertEqual(1, len(resp.json['workflows'])) self.assertDictEqual(WF, resp_workflow_json) @mock.patch.object(db_api, "get_workflow_definitions", MOCK_EMPTY) def test_get_all_empty(self): resp = self.app.get('/v2/workflows') self.assertEqual(200, resp.status_int) self.assertEqual(0, len(resp.json['workflows'])) @mock.patch('mistral.db.v2.api.get_workflow_definitions') @mock.patch('mistral.context.MistralContext.from_environ') def test_get_all_projects_admin(self, mock_context, mock_get_wf_defs): admin_ctx = unit_base.get_context(admin=True) mock_context.return_value = admin_ctx resp = self.app.get('/v2/workflows?all_projects=true') self.assertEqual(200, resp.status_int) self.assertTrue(mock_get_wf_defs.call_args[1].get('insecure', False)) def test_get_all_projects_normal_user(self): resp = self.app.get( '/v2/workflows?all_projects=true', expect_errors=True ) self.assertEqual(403, resp.status_int) @mock.patch.object(db_api, "get_workflow_definitions", MOCK_WFS) def test_get_all_pagination(self): resp = self.app.get( '/v2/workflows?limit=1&sort_keys=id,name') self.assertEqual(200, resp.status_int) self.assertIn('next', resp.json) resp_workflow_json = resp.json['workflows'][0] resp_workflow_json['interface'] = \ json.loads(resp_workflow_json['interface']) self.assertEqual(1, len(resp.json['workflows'])) self.assertDictEqual(WF, resp_workflow_json) param_dict = utils.get_dict_from_string( resp.json['next'].split('?')[1], delimiter='&' ) expected_dict = { 'marker': '123e4567-e89b-12d3-a456-426655440000', 'limit': 1, 'sort_keys': 'id,name', 'sort_dirs': 'asc,asc', } self.assertDictEqual(expected_dict, param_dict) def test_get_all_pagination_limit_negative(self): resp = self.app.get( '/v2/workflows?limit=-1&sort_keys=id,name&sort_dirs=asc,asc', expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Limit must be positive", resp.body.decode()) def test_get_all_pagination_limit_not_integer(self): resp = self.app.get( '/v2/workflows?limit=1.1&sort_keys=id,name&sort_dirs=asc,asc', expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("unable to convert to int", resp.body.decode()) def test_get_all_pagination_invalid_sort_dirs_length(self): resp = self.app.get( '/v2/workflows?limit=1&sort_keys=id,name&sort_dirs=asc,asc,asc', expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn( "Length of sort_keys must be equal or greater than sort_dirs", resp.body.decode() ) def test_get_all_pagination_unknown_direction(self): resp = self.app.get( '/v2/workflows?limit=1&sort_keys=id&sort_dirs=nonexist', expect_errors=True ) self.assertEqual(400, resp.status_int) self.assertIn("Unknown sort direction", resp.body.decode()) @mock.patch('mistral.db.v2.api.get_workflow_definitions') def test_get_all_with_fields_filter(self, mock_get_db_wfs): mock_get_db_wfs.return_value = [ ('123e4567-e89b-12d3-a456-426655440000', 'fake_name') ] resp = self.app.get('/v2/workflows?fields=name') self.assertEqual(200, resp.status_int) self.assertEqual(1, len(resp.json['workflows'])) expected_dict = { 'id': '123e4567-e89b-12d3-a456-426655440000', 'name': 'fake_name' } self.assertDictEqual(expected_dict, resp.json['workflows'][0]) @mock.patch('mistral.db.v2.api.get_workflow_definitions') def test_get_all_with_fields_input_filter(self, mock_get_db_wfs): expected_dict = { 'id': '65df1f59-938f-4c17-bc2a-562524ef5e40', 'input': 'param1, param2="2"', 'interface': { "output": [], "input": ["param1", {"param2": 2} ] } } def mock_get_defintions(fields=None, session=None, **kwargs): if fields and 'input' in fields: fields.remove('input') fields.append('spec') return [ ('65df1f59-938f-4c17-bc2a-562524ef5e40', {'input': ['param1', {'param2': 2}]}) ] mock_get_db_wfs.side_effect = mock_get_defintions resp = self.app.get('/v2/workflows?fields=input') self.assertEqual(200, resp.status_int) self.assertEqual(1, len(resp.json['workflows'])) resp_workflow_json = resp.json['workflows'][0] resp_workflow_json['interface'] = \ json.loads(resp_workflow_json['interface']) self.assertDictEqual(expected_dict, resp_workflow_json) def test_get_all_with_invalid_field(self): resp = self.app.get( '/v2/workflows?fields=name,nonexist', expect_errors=True ) self.assertEqual(400, resp.status_int) response_msg = resp.body.decode() self.assertIn("nonexist", response_msg) self.assertIn("do not exist", response_msg) def test_validate(self): resp = self.app.post( '/v2/workflows/validate', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(200, resp.status_int) self.assertTrue(resp.json['valid']) def test_validate_invalid_model_exception(self): resp = self.app.post( '/v2/workflows/validate', WF_DEF_INVALID_MODEL_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(200, resp.status_int) self.assertFalse(resp.json['valid']) self.assertIn("Invalid DSL", resp.json['error']) def test_validate_dsl_parse_exception(self): resp = self.app.post( '/v2/workflows/validate', WF_DEF_DSL_PARSE_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(200, resp.status_int) self.assertFalse(resp.json['valid']) self.assertIn("Definition could not be parsed", resp.json['error']) def test_validate_yaql_parse_exception(self): resp = self.app.post( '/v2/workflows/validate', WF_DEF_YAQL_PARSE_EXCEPTION, headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(200, resp.status_int) self.assertFalse(resp.json['valid']) self.assertIn("unexpected '*' at position 1", resp.json['error']) def test_validate_empty(self): resp = self.app.post( '/v2/workflows/validate', '', headers={'Content-Type': 'text/plain'}, expect_errors=True ) self.assertEqual(200, resp.status_int) self.assertFalse(resp.json['valid']) self.assertIn("Invalid DSL", resp.json['error']) @mock.patch("mistral.services.workflows.update_workflows") @mock.patch.object(db_api, "create_workflow_definition") def test_workflow_within_namespace(self, mock_mtd, update_mock): mock_mtd.return_value = WF_DB_WITHIN_ABC_NAMESPACE namespace = 'abc' resp = self.app.post( '/v2/workflows?namespace=%s' % namespace, WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(201, resp.status_int) self.assertDictEqual({'workflows': [WF_WITH_NAMESPACE]}, resp.json) self.assertEqual(1, mock_mtd.call_count) spec = mock_mtd.call_args[0][0]['spec'] self.assertIsNotNone(spec) self.assertEqual(WF_DB.name, spec['name']) self.assertEqual(WF_DB_WITHIN_ABC_NAMESPACE.namespace, namespace) update_mock.return_value = [WF_DB_WITHIN_ABC_NAMESPACE] id_ = '234560fe-162a-4060-a16a-a0d9eee9b408' resp = self.app.put( '/v2/workflows/%s?namespace=%s' % (id_, namespace), WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) self.assertEqual(200, resp.status_int) update_mock.assert_called_once_with( WF_DEFINITION, scope='private', identifier=id_, namespace='abc', validate=True ) self.assertDictEqual(WF_WITH_NAMESPACE, resp.json) @mock.patch.object(db_api, "get_workflow_definition") def test_workflow_within_project_id(self, mock_get): mock_get.return_value = WF_DB_PROJECT_ID resp = self.app.get( '/v2/workflows/123e4567-e89b-12d3-a456-426655440000') self.assertEqual(200, resp.status_int) self.assertIn('project_id', resp.json) def test_post_checksum_workflow_added(self): resp = self.app.post( '/v2/workflows', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) body = resp.json checksum = body['workflows'][0]['checksum'] self.assertIsNotNone(checksum) def test_put_checksum_workflow_updated(self): resp = self.app.post( '/v2/workflows', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) body = resp.json checksum_old = body['workflows'][0]['checksum'] resp = self.app.put( '/v2/workflows', WF_DEFINITION_UPDATED, headers={'Content-Type': 'text/plain'} ) body = resp.json checksum = body['workflows'][0]['checksum'] self.assertTrue(checksum != checksum_old) def test_checksum_has_md5_format(self): resp = self.app.post( '/v2/workflows', WF_DEFINITION, headers={'Content-Type': 'text/plain'} ) body = resp.json checksum = body['workflows'][0]['checksum'] self.assertTrue(self.is_valid_checksum(checksum)) def is_valid_checksum(self, checksum): try: hashlib.md5(checksum.encode()) return True except Exception: return False