Adding 'workflow_params' to cron triggers

TODO:
 - create some tests on triggers with engine

Closes-Bug: #1449522

Change-Id: I68cc1121f8b1b7c211644a032c788b1361e831db
(cherry picked from commit c3a8bfebb5)
This commit is contained in:
Nikolay Mahotkin 2015-04-30 15:09:18 +03:00 committed by Renat Akhmerov
parent 930fcb2cde
commit a72fddfb1f
8 changed files with 104 additions and 15 deletions

View File

@ -34,6 +34,7 @@ class CronTrigger(resource.Resource):
name = wtypes.text
workflow_name = wtypes.text
workflow_input = wtypes.text
workflow_params = wtypes.text
scope = SCOPE_TYPES
@ -48,11 +49,21 @@ class CronTrigger(resource.Resource):
def to_dict(self):
d = super(CronTrigger, self).to_dict()
if d.get('workflow_input'):
d['workflow_input'] = json.loads(d['workflow_input'])
self._transform_string_to_dict(
d, ['workflow_input', 'workflow_params']
)
return d
def _transform_string_to_dict(self, d, keys):
"""Transforms values of dict by given key list.
:param d: dict to transform.
:param keys: list of key names in dict
"""
for k in keys:
if d.get(k):
d[k] = json.loads(d[k])
@classmethod
def from_dict(cls, d):
e = cls()
@ -60,7 +71,8 @@ class CronTrigger(resource.Resource):
for key, val in d.items():
if hasattr(e, key):
# Nonetype check for dictionary must be explicit.
if key == 'workflow_input' and val is not None:
if (key in ['workflow_input', 'workflow_params']
and val is not None):
val = json.dumps(val)
setattr(e, key, val)
@ -73,6 +85,7 @@ class CronTrigger(resource.Resource):
name='my_trigger',
workflow_name='my_wf',
workflow_input={},
workflow_params={},
scope='private',
pattern='* * * * *',
remaining_executions=42,
@ -115,6 +128,7 @@ class CronTriggersController(rest.RestController):
values['name'],
values['workflow_name'],
values.get('workflow_input'),
values.get('workflow_params'),
values.get('pattern'),
values.get('first_execution_time'),
values.get('remaining_executions')

View File

@ -0,0 +1,52 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Kilo
Revision ID: 002
Revises: 001
Create Date: 2015-04-30 16:15:34.737030
"""
# revision identifiers, used by Alembic.
revision = '002'
down_revision = '001'
from alembic import op
import sqlalchemy as sa
from mistral.db.sqlalchemy import types as st
def upgrade():
op.drop_table('tasks')
op.drop_table('workflow_executions')
op.drop_table('workbooks')
op.drop_table('triggers')
op.add_column(
'cron_triggers_v2',
sa.Column('workflow_params', st.JsonEncoded(), nullable=True)
)
op.add_column(
'cron_triggers_v2',
sa.Column('workflow_params_hash', sa.CHAR(length=64), nullable=True)
)
op.create_unique_constraint(
None,
'cron_triggers_v2',
['workflow_input_hash', 'workflow_name', 'pattern',
'project_id', 'workflow_params_hash']
)

View File

@ -45,7 +45,7 @@ def do_check_migration(config, _cmd):
do_alembic_command(config, 'branches')
def do_upgrade_downgrade(config, cmd):
def do_upgrade(config, cmd):
if not CONF.command.revision and not CONF.command.delta:
raise SystemExit('You must provide a revision or relative delta')
@ -89,7 +89,7 @@ def add_command_parsers(subparsers):
parser.add_argument('--delta', type=int)
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision', nargs='?')
parser.set_defaults(func=do_upgrade_downgrade)
parser.set_defaults(func=do_upgrade)
parser = subparsers.add_parser('populate')
parser.set_defaults(func=do_populate)

View File

@ -230,10 +230,13 @@ class Environment(mb.MistralSecureModelBase):
variables = sa.Column(st.JsonDictType())
def _calc_workflow_input_hash(context):
d = context.current_parameters['workflow_input'] or {}
def _get_hash_function_by(column_name):
def calc_hash(context):
d = context.current_parameters[column_name] or {}
return hashlib.sha256(json.dumps(sorted(d.items()))).hexdigest()
return hashlib.sha256(json.dumps(sorted(d.items()))).hexdigest()
return calc_hash
class CronTrigger(mb.MistralSecureModelBase):
@ -244,7 +247,8 @@ class CronTrigger(mb.MistralSecureModelBase):
__table_args__ = (
sa.UniqueConstraint('name', 'project_id'),
sa.UniqueConstraint(
'workflow_input_hash', 'workflow_name', 'pattern', 'project_id'
'workflow_input_hash', 'workflow_name', 'pattern', 'project_id',
'workflow_params_hash'
)
)
@ -261,10 +265,15 @@ class CronTrigger(mb.MistralSecureModelBase):
)
workflow = relationship('WorkflowDefinition', lazy='joined')
workflow_params = sa.Column(st.JsonDictType())
workflow_params_hash = sa.Column(
sa.CHAR(64),
default=_get_hash_function_by('workflow_params')
)
workflow_input = sa.Column(st.JsonDictType())
workflow_input_hash = sa.Column(
sa.CHAR(64),
default=_calc_workflow_input_hash
default=_get_hash_function_by('workflow_input')
)
trust_id = sa.Column(sa.String(80))

View File

@ -40,7 +40,8 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
try:
rpc.get_engine_client().start_workflow(
t.workflow.name,
t.workflow_input
t.workflow_input,
**t.workflow_params
)
finally:
if t.remaining_executions > 0:

View File

@ -1,6 +1,6 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@ -51,8 +51,9 @@ def validate_cron_trigger_input(pattern, first_time, count):
" valid: {}".format(pattern))
def create_cron_trigger(name, workflow_name, workflow_input, pattern=None,
first_time=None, count=None, start_time=None):
def create_cron_trigger(name, workflow_name, workflow_input,
workflow_params=None, pattern=None, first_time=None,
count=None, start_time=None):
if not start_time:
start_time = datetime.datetime.now()
@ -82,7 +83,8 @@ def create_cron_trigger(name, workflow_name, workflow_input, pattern=None,
'remaining_executions': count,
'workflow_name': workflow_name,
'workflow_id': wf.id,
'workflow_input': workflow_input,
'workflow_input': workflow_input or {},
'workflow_params': workflow_params or {},
'scope': 'private'
}

View File

@ -32,6 +32,7 @@ TRIGGER = {
'pattern': '* * * * *',
'workflow_name': WF.name,
'workflow_input': '{}',
'workflow_params': '{}',
'scope': 'private',
'remaining_executions': 42
}
@ -40,6 +41,9 @@ trigger_values = copy.copy(TRIGGER)
trigger_values['workflow_input'] = json.loads(
trigger_values['workflow_input'])
trigger_values['workflow_params'] = json.loads(
trigger_values['workflow_params'])
TRIGGER_DB = models.CronTrigger()
TRIGGER_DB.update(trigger_values)

View File

@ -50,6 +50,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test',
self.wf.name,
{},
{},
'*/5 * * * *',
None,
None,
@ -73,6 +74,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test',
self.wf.name,
{},
{},
None,
"4242-12-25 13:37",
None,
@ -97,6 +99,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test',
self.wf.name,
{},
{},
'*/2 * * * *',
None,
None,
@ -110,6 +113,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test1',
self.wf.name,
{},
{},
'*/5 * * * *',
None,
None,
@ -120,6 +124,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test2',
self.wf.name,
{},
{},
'*/1 * * * *',
None,
None,
@ -130,6 +135,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test3',
self.wf.name,
{},
{},
'*/2 * * * *',
None,
None,
@ -140,6 +146,7 @@ class TriggerServiceV2Test(base.DbTestCase):
'test4',
self.wf.name,
{},
{},
'*/3 * * * *',
None,
None,