Make running workflows more robust

Currently there is no fallback in case when Zaqar message does not
arrive to inform about the status of ongoing Mistral workflow.

This change introduces mechanism to make execution of workflows more
robust in TripleO UI:

Workflows should be triggered using startWorkflow action - startWorkflow
starts polling for workflow execution after a timeout expires and polls
until workflow execution is finished. Then it executes a callback (usually
action dispatch)

In case a relevant Zaqar message arrives, handleMessage action is triggered
which cancels the timeout, fetches the execution and passes it to callback.

Benefits:
* Application always gets to the result of a workflow execution, regardless
  whether the Zaqar message arrives or not
* Application does not depend on Zaqar message containing relevant data.
  Message is only required to deliver the execution ID, which is then used
  to fetch execution separately
* All executions are tracked in application state

Partial-Bug: #1753474
Change-Id: I2ae26986e5c2e8870238a287fbd3df46a92ed65f
This commit is contained in:
Jiri Tomasek 2018-03-02 14:52:59 +00:00
parent 16514bf462
commit 494af34c9b
14 changed files with 461 additions and 32 deletions

View File

@ -22,6 +22,7 @@ import NodesActions from '../../js/actions/NodesActions';
import * as ErrorActions from '../../js/actions/ErrorActions';
import NodesConstants from '../../js/constants/NodesConstants';
import MistralConstants from '../../js/constants/MistralConstants';
import * as WorkflowActions from '../../js/actions/WorkflowActions';
const mockGetNodesResponse = [{ uuid: 1 }, { uuid: 2 }];
@ -310,7 +311,7 @@ describe('startProvideNodes Action', () => {
const nodeIds = ['598612eb-f21b-435e-a868-7bb74e576cc2'];
beforeEach(() => {
MistralApiService.runWorkflow = jest
WorkflowActions.startWorkflow = jest
.fn()
.mockReturnValue(() => Promise.resolve({ state: 'RUNNING' }));
NodesActions.pollNodeslistDuringProgress = jest
@ -320,12 +321,7 @@ describe('startProvideNodes Action', () => {
it('dispatches actions', () =>
store.dispatch(NodesActions.startProvideNodes(nodeIds)).then(() => {
expect(MistralApiService.runWorkflow).toHaveBeenCalledWith(
MistralConstants.BAREMETAL_PROVIDE,
{
node_uuids: nodeIds
}
);
expect(WorkflowActions.startWorkflow).toHaveBeenCalled();
expect(NodesActions.pollNodeslistDuringProgress).toHaveBeenCalled();
expect(store.getActions()).toEqual([
NodesActions.startOperation(nodeIds)

View File

@ -0,0 +1,161 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
jest.useFakeTimers();
import { mockStore } from './utils';
import MistralApiService from '../../js/services/MistralApiService';
import WorkflowExecutionsActions from '../../js/actions/WorkflowExecutionsActions';
import * as WorkflowActions from '../../js/actions/WorkflowActions';
import * as WorkflowExecutionsSelectors from '../../js/selectors/workflowExecutions';
import * as WorkflowExecutionTimeoutsSelectors from '../../js/selectors/workflowExecutionTimeouts';
describe('startWorkflow action', () => {
const store = mockStore({});
const execution = {
id: '047d76c2-637d-4199-841b-7abbe9e4706b'
};
beforeEach(() => {
MistralApiService.runWorkflow = jest
.fn()
.mockReturnValue(() => Promise.resolve(execution));
});
it('dispatches expected actions and sets timeout', () => {
const onFinished = jest.fn();
return store
.dispatch(
WorkflowActions.startWorkflow('TEST_WORKFLOW', {}, onFinished, 1000)
)
.then(() => {
expect(MistralApiService.runWorkflow).toHaveBeenCalledWith(
'TEST_WORKFLOW',
{}
);
expect(store.getActions()).toEqual([
WorkflowExecutionsActions.addWorkflowExecution(execution),
WorkflowActions.setWorkflowTimeout(execution.id, expect.any(Number))
]);
expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 1000);
});
});
});
describe('pollWorkflowExecution action for RUNNING execution', () => {
const store = mockStore({});
it('dispatches expected actions and sets timeout', () => {
const execution = {
id: '047d76c2-637d-4199-841b-7abbe9e4706b',
state: 'RUNNING'
};
MistralApiService.getWorkflowExecution = jest
.fn()
.mockReturnValue(() => Promise.resolve(execution));
return store
.dispatch(WorkflowActions.pollWorkflowExecution(execution.id, jest.fn()))
.then(() => {
expect(MistralApiService.getWorkflowExecution).toHaveBeenCalledWith(
execution.id
);
expect(store.getActions()).toEqual([
WorkflowExecutionsActions.addWorkflowExecution(execution),
WorkflowActions.setWorkflowTimeout(execution.id, expect.any(Number))
]);
expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 2500);
});
});
});
describe('pollWorkflowExecution action with finished execution', () => {
const store = mockStore({});
it('dispatches expected actions', () => {
const execution = {
id: '047d76c2-637d-4199-841b-7abbe9e4706b',
state: 'SUCCESS'
};
MistralApiService.getWorkflowExecution = jest
.fn()
.mockReturnValue(() => Promise.resolve(execution));
const onFinished = jest.fn();
return store
.dispatch(WorkflowActions.pollWorkflowExecution(execution.id, onFinished))
.then(() => {
expect(MistralApiService.getWorkflowExecution).toHaveBeenCalledWith(
execution.id
);
expect(store.getActions()).toEqual([
WorkflowExecutionsActions.addWorkflowExecution(execution),
WorkflowActions.cancelWorkflowTimeout(execution.id)
]);
expect(onFinished).toHaveBeenCalledWith(execution);
});
});
});
describe('handleWorkflowMessage action', () => {
it('dispatches expected actions', () => {
const store = mockStore({});
WorkflowExecutionsSelectors.getWorkflowExecution = jest
.fn()
.mockReturnValue({ id: 'running-execution-id', state: 'RUNNING' });
WorkflowExecutionTimeoutsSelectors.getWorkflowExecutionTimeout = jest
.fn()
.mockReturnValue({ 'running-execution-id': 1 });
store.dispatch(
WorkflowActions.handleWorkflowMessage('running-execution-id', jest.fn())
);
expect(clearTimeout).toHaveBeenCalled();
expect(store.getActions()).toEqual([
WorkflowActions.cancelWorkflowTimeout('running-execution-id')
]);
jest.clearAllMocks();
});
it('dispatches reacts to message when application has no record of the execution', () => {
const store = mockStore({});
WorkflowExecutionsSelectors.getWorkflowExecution = jest
.fn()
.mockReturnValue(undefined);
WorkflowExecutionTimeoutsSelectors.getWorkflowExecutionTimeout = jest
.fn()
.mockReturnValue(undefined);
store.dispatch(
WorkflowActions.handleWorkflowMessage('unknown-execution-id', jest.fn())
);
expect(clearTimeout).toHaveBeenCalled();
expect(store.getActions()).toEqual([
WorkflowActions.cancelWorkflowTimeout('unknown-execution-id')
]);
jest.clearAllMocks();
});
it('does not react to message when execution is not RUNNING', () => {
const store = mockStore({});
WorkflowExecutionsSelectors.getWorkflowExecution = jest
.fn()
.mockReturnValue({ id: 'finished-execution-id', state: 'SUCCESS' });
store.dispatch(
WorkflowActions.handleWorkflowMessage('finished-execution-id', jest.fn())
);
expect(clearTimeout).not.toHaveBeenCalled();
expect(store.getActions()).toEqual([]);
jest.clearAllMocks();
});
});

View File

@ -0,0 +1,55 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import { Map } from 'immutable';
import WorkflowExecutionsConstants from '../../js/constants/WorkflowExecutionsConstants';
import workflowExecutionTimeoutsReducer from '../../js/reducers/workflowExecutionTimeoutsReducer';
describe('workflowExecutionTimeoutsReducer', () => {
const initialState = Map();
const updatedState = Map({
'some-execution-id': 1
});
it('should return initial state', () => {
expect(workflowExecutionTimeoutsReducer(initialState, {})).toEqual(
initialState
);
});
it('should handle SET_WORKFLOW_TIMEOUT', () => {
const action = {
type: WorkflowExecutionsConstants.SET_WORKFLOW_TIMEOUT,
payload: {
executionId: 'some-execution-id',
timeout: 1
}
};
const newState = workflowExecutionTimeoutsReducer(initialState, action);
expect(newState).toEqual(updatedState);
});
it('should handle CANCEL_WORKFLOW_TIMEOUT', () => {
const action = {
type: WorkflowExecutionsConstants.CANCEL_WORKFLOW_TIMEOUT,
payload: 'some-execution-id'
};
const newState = workflowExecutionTimeoutsReducer(updatedState, action);
expect(newState).toEqual(initialState);
});
});

View File

@ -32,6 +32,7 @@ import {
} from '../normalizrSchemas/nodes';
import MistralConstants from '../constants/MistralConstants';
import { setNodeCapability } from '../utils/nodes';
import { startWorkflow } from './WorkflowActions';
const messages = defineMessages({
introspectionNotificationMessage: {
@ -255,9 +256,11 @@ export default {
dispatch(this.startOperation(nodeIds));
dispatch(this.pollNodeslistDuringProgress());
return dispatch(
MistralApiService.runWorkflow(MistralConstants.BAREMETAL_PROVIDE, {
node_uuids: nodeIds
})
startWorkflow(
MistralConstants.BAREMETAL_PROVIDE,
{ node_uuids: nodeIds },
execution => dispatch(this.provideNodesFinished(execution))
)
).catch(error => {
dispatch(handleErrors(error, 'Selected Nodes could not be provided'));
dispatch(this.finishOperation(nodeIds));
@ -265,28 +268,29 @@ export default {
};
},
provideNodesFinished(messagePayload) {
provideNodesFinished(execution) {
const { input, output, state } = execution;
return (dispatch, getState) => {
const nodeIds = messagePayload.execution.input.node_uuids;
const nodeIds = input.node_uuids;
dispatch(this.finishOperation(nodeIds));
dispatch(this.fetchNodes());
switch (messagePayload.status) {
switch (state) {
case 'SUCCESS': {
dispatch(
NotificationActions.notify({
type: 'success',
title: 'Nodes are available',
message: messagePayload.message
message: output.message
})
);
break;
}
case 'FAILED': {
case 'ERROR': {
dispatch(
NotificationActions.notify({
title: 'Some Nodes could not be provided',
message: messagePayload.message
message: output.message
.filter(message => message.result)
.map(message => message.result)
})

View File

@ -0,0 +1,115 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import { handleErrors } from './ErrorActions';
import MistralApiService from '../services/MistralApiService';
import WorkflowExecutionsActions from './WorkflowExecutionsActions';
import WorkflowExecutionConstants from '../constants/WorkflowExecutionsConstants';
import { getWorkflowExecution } from '../selectors/workflowExecutions';
import { getWorkflowExecutionTimeout } from '../selectors/workflowExecutionTimeouts';
/**
* @param {string} name - Name of the Mistral workflow to execute
* @param {object} input - Input for Mistral workflow
* @param {function} onFinished - function to execute after workflow execution
* @param {integer} timeout - time after which polling should start
* is finished, execution object is passed as a parameter
*
* startWorkflow triggers the Mistral workflow, then sets the timeout after
* which we start to poll for workflow execution, to get the result. timeout
* is stored in redux store, so it is possible to cancel it in case a Zaqar
* message arrives
*/
export const startWorkflow = (
name,
input,
onFinished,
timeout = 30000
) => dispatch =>
dispatch(MistralApiService.runWorkflow(name, input)).then(execution => {
dispatch(WorkflowExecutionsActions.addWorkflowExecution(execution));
const t = setTimeout(
() => dispatch(pollWorkflowExecution(execution.id, onFinished)),
timeout
);
dispatch(setWorkflowTimeout(execution.id, t));
return Promise.resolve(execution);
});
/**
* @param {string} executionId
* @param {function} onFinished - callback to execute after workflow execution
* is finished, execution object is passed as a parameter
* @param {number} timeout - poll timeout in milliseconds
*/
export const pollWorkflowExecution = (
executionId,
onFinished,
timeout = 2500
) => dispatch =>
dispatch(MistralApiService.getWorkflowExecution(executionId))
.then(execution => {
dispatch(WorkflowExecutionsActions.addWorkflowExecution(execution));
if (execution.state === 'RUNNING') {
const t = setTimeout(
() =>
dispatch(pollWorkflowExecution(executionId, onFinished, timeout)),
timeout
);
dispatch(setWorkflowTimeout(executionId, t));
} else {
dispatch(cancelWorkflowTimeout(executionId));
onFinished && onFinished(execution);
}
})
.catch(error =>
dispatch(handleErrors(error, 'Execution could not be loaded'))
);
/**
* @param {string} executionId
* @param {function} onFinished - callback to execute after workflow execution
* is finished, execution object is passed as a parameter
*
* This action is dispatched after a Zaqar message arrives, it reacts to message
* only in case when we know what to do with it and when it's execution is not
* finished or we have no record of it. It cancels the existing poll timeout and
* starts polling for execution itself.
*/
export const handleWorkflowMessage = (
executionId,
onFinished,
pollTimeout = 2500
) => (dispatch, getState) => {
const execution = getWorkflowExecution(getState(), executionId);
// react to message only when it's execution is not finished or we have no
// record of it
if (execution === undefined || execution.state === 'RUNNING') {
clearTimeout(getWorkflowExecutionTimeout(getState(), executionId));
dispatch(cancelWorkflowTimeout(executionId));
dispatch(pollWorkflowExecution(executionId, onFinished, pollTimeout));
}
};
export const setWorkflowTimeout = (executionId, timeout) => ({
type: WorkflowExecutionConstants.SET_WORKFLOW_TIMEOUT,
payload: { executionId, timeout }
});
export const cancelWorkflowTimeout = executionId => ({
type: WorkflowExecutionConstants.CANCEL_WORKFLOW_TIMEOUT,
payload: executionId
});

View File

@ -23,6 +23,7 @@ import RolesActions from './RolesActions';
import ValidationsActions from './ValidationsActions';
import MistralConstants from '../constants/MistralConstants';
import ZaqarWebSocketService from '../services/ZaqarWebSocketService';
import { handleWorkflowMessage } from './WorkflowActions';
export default {
handleAuthenticationSuccess(message, dispatch) {
@ -54,7 +55,11 @@ export default {
break;
case MistralConstants.BAREMETAL_PROVIDE:
dispatch(NodesActions.provideNodesFinished(payload));
dispatch(
handleWorkflowMessage(payload.execution.id, execution =>
dispatch(NodesActions.provideNodesFinished(execution))
)
);
break;
case MistralConstants.BAREMETAL_MANAGE:

View File

@ -22,5 +22,7 @@ export default keyMirror({
FETCH_WORKFLOW_EXECUTIONS_FAILED: null,
ADD_WORKFLOW_EXECUTION: null,
ADD_WORKFLOW_EXECUTION_FROM_MESSAGE: null,
UPDATE_WORKFLOW_EXECUTION_PENDING: null
UPDATE_WORKFLOW_EXECUTION_PENDING: null,
SET_WORKFLOW_TIMEOUT: null,
CANCEL_WORKFLOW_TIMEOUT: null
});

View File

@ -14,7 +14,13 @@
* under the License.
*/
import { Map, Record } from 'immutable';
import { Map, OrderedMap, Record } from 'immutable';
export const WorkflowExecutionsState = Record({
executionsLoaded: false,
isFetching: false,
executions: OrderedMap()
});
export const WorkflowExecution = Record({
description: undefined,

View File

@ -34,6 +34,7 @@ import { availableRolesReducer } from './rolesReducer';
import stacksReducer from './stacksReducer';
import validationsReducer from './validationsReducer';
import workflowExecutionsReducer from './workflowExecutionsReducer';
import workflowExecutionTimeoutsReducer from './workflowExecutionTimeoutsReducer';
const appReducer = combineReducers({
appConfig,
@ -53,7 +54,8 @@ const appReducer = combineReducers({
availableRoles: availableRolesReducer,
stacks: stacksReducer,
validations: validationsReducer,
form: formReducer
form: formReducer,
workflowExecutionTimeouts: workflowExecutionTimeoutsReducer
});
export default appReducer;

View File

@ -0,0 +1,34 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import { Map } from 'immutable';
import WorkflowExecutionsConstants from '../constants/WorkflowExecutionsConstants';
const workflowExecutionTimeoutsReducer = (state = Map(), action) => {
switch (action.type) {
case WorkflowExecutionsConstants.SET_WORKFLOW_TIMEOUT: {
const { executionId, timeout } = action.payload;
return state.set(executionId, timeout);
}
case WorkflowExecutionsConstants.CANCEL_WORKFLOW_TIMEOUT:
return state.delete(action.payload);
default:
return state;
}
};
export default workflowExecutionTimeoutsReducer;

View File

@ -14,16 +14,15 @@
* under the License.
*/
import { fromJS, Map, OrderedMap } from 'immutable';
import { fromJS } from 'immutable';
import WorkflowExecutionsConstants from '../constants/WorkflowExecutionsConstants';
import { WorkflowExecution } from '../immutableRecords/workflowExecutions';
import {
WorkflowExecution,
WorkflowExecutionsState
} from '../immutableRecords/workflowExecutions';
const initialState = Map({
executionsLoaded: false,
isFetching: false,
executions: OrderedMap()
});
const initialState = new WorkflowExecutionsState();
export default function workflowExecutionsReducer(
state = initialState,

View File

@ -0,0 +1,20 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
export const getWorkflowExecutionTimeouts = state =>
state.workflowExecutionTimeouts;
export const getWorkflowExecutionTimeout = (state, executionId) =>
state.workflowExecutionTimeouts.get(executionId);

View File

@ -0,0 +1,19 @@
/**
* Copyright 2018 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
export const getWorkflowExecutions = state => state.executions.executions;
export const getWorkflowExecution = (state, executionId) =>
state.executions.executions.get(executionId);

View File

@ -24,7 +24,6 @@ import {
ConnectionError
} from './errors';
import { getServiceUrl, getAuthTokenId } from '../selectors/auth';
import MistralConstants from '../constants/MistralConstants';
class MistralApiService {
defaultRequest(path, additionalAttributes) {
@ -61,6 +60,21 @@ class MistralApiService {
.catch(handleErrors);
}
/**
* Get a Workflow execution
* Mistral API: GET /v2/executions/:execution_id
* @param {executionId} Execution ID
*/
getWorkflowExecution(executionId) {
return dispatch =>
dispatch(this.defaultRequest('/executions/' + executionId))
.then(response => {
const execution = parseExecutionAttrs(response.data);
return when.resolve(execution);
})
.catch(handleErrors);
}
/**
* Deletes a Workflow execution
* Mistral API: DELETE /v2/executions/:execution_id
@ -105,11 +119,8 @@ class MistralApiService {
if (response.data.state === 'ERROR') {
return when.reject(new MistralExecutionError(response));
} else if (workflowName === MistralConstants.VALIDATIONS_RUN) {
// Running validation is special case when whole execution needs to be returned
return when.resolve(response.data);
} else {
return when.resolve(response.data.output.result);
return when.resolve(response.data);
}
})
.catch(e => {