added dynamic cluster create flow

- added create_cluster_node flow factory function for creating a
  cluster node
- modified create_cluster flow factory function to call
  create_cluster_node for each cluster node
- modified neutron and nova fixtures to support testing for
  cluster create flow

Change-Id: I92aae38d71baeca9a84385813180703bbb9c2ece
This commit is contained in:
Min Pae 2015-02-16 09:18:31 -08:00
parent 089c73d556
commit e7ce65c1be
14 changed files with 318 additions and 64 deletions

View File

@ -28,6 +28,9 @@ API_SERVICE_OPTS = [
default=1000,
help='The maximum number of items returned in a single '
'response from a collection resource.'),
# TODO(sputnik13): this needs to be removed when image selection is done
cfg.StrOpt('os_image_id',
help='The Image ID to use for VMs created as part of a cluster')
]
CONF = cfg.CONF

View File

@ -18,8 +18,6 @@
"""Version 1 of the Cue API
"""
import uuid
from cue.api.controllers import base
from cue.common import exception
from cue.common.i18n import _ # noqa
@ -30,6 +28,7 @@ from cue.taskflow import client as task_flow_client
from cue.taskflow.flow import create_cluster
from cue.taskflow.flow import delete_cluster
from oslo.config import cfg
from oslo.utils import uuidutils
import pecan
from pecan import rest
@ -38,6 +37,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class EndPoint(base.APIBase):
@ -216,18 +216,23 @@ class ClustersController(rest.RestController):
cluster.cluster = get_complete_cluster(context, new_cluster.id)
# prepare and post cluster create job to backend
flow_kwargs = {
'cluster_id': cluster.cluster.id,
'cluster_size': cluster.cluster.size,
}
job_args = {
"size": cluster.cluster.size,
"flavor": cluster.cluster.flavor,
# TODO(sputnik13): need to remove this when image selector is done
"image": CONF.api.os_image_id,
"volume_size": cluster.cluster.volume_size,
"network_id": cluster.cluster.network_id,
"cluster_status": "BUILDING",
"node_id": "node_id",
"port_name": "port_" + str(uuid.uuid4()),
}
job_client = task_flow_client.get_client_instance()
job_uuid = uuidutils.generate_uuid()
job_client.post(create_cluster, job_args, tx_uuid=job_uuid)
job_client.post(create_cluster, job_args,
flow_kwargs=flow_kwargs,
tx_uuid=job_uuid)
LOG.info(_LI('Create Cluster Request Cluster ID %(cluster_id)s Cluster'
' size %(size)s network ID %(network_id)s Job ID '

View File

@ -74,9 +74,9 @@ def get_client_instance(client_name=None, persistence=None, jobboard=None):
if _task_flow_client is None:
if persistence is None:
persistence = Client.create_persistence()
persistence = create_persistence()
if jobboard is None:
jobboard = Client.create_jobboard(persistence=persistence)
jobboard = create_jobboard(persistence=persistence)
if client_name is None:
client_name = "cue_job_client"

View File

@ -13,5 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from create_cluster import create_cluster # noqa
from delete_cluster import delete_cluster # noqa
from create_cluster import create_cluster # noqa
from create_cluster_node import create_cluster_node # noqa
from delete_cluster import delete_cluster # noqa

View File

@ -14,31 +14,29 @@
# under the License.
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
import taskflow.patterns.unordered_flow as unordered_flow
import cue.client as client
import cue.taskflow.task as cue_task
import os_tasklib.common as common_task
import os_tasklib.neutron as neutron_task
import os_tasklib.nova as nova_task
from cue.taskflow.flow import create_cluster_node
def create_cluster():
flow = linear_flow.Flow('creating vm').add(
neutron_task.CreatePort(os_client=client.neutron_client(),
provides='neutron_port_id'),
nova_task.CreateVm(os_client=client.nova_client(),
requires=('name', 'image', 'flavor', 'nics'),
provides='nova_vm_id'),
linear_flow.Flow('wait for vm to become active',
retry=retry.Times(10)).add(
nova_task.GetVmStatus(os_client=client.nova_client(),
provides='vm_status'),
common_task.CheckFor(rebind={'check_var': 'vm_status'},
check_value='ACTIVE',
timeout_seconds=1),
),
cue_task.UpdateClusterStatus(cue_client="cue client"),
def create_cluster(cluster_id, cluster_size):
"""Create Cluster flow factory function
This factory function uses :func:`cue.taskflow.flow.create_cluster_node` to
create a multi node cluster.
:param cluster_id: A unique ID assigned to the cluster being created
:type cluster_id: string
:param cluster_size: The number of nodes in the cluster
:type cluster_size: number
:return: A flow instance that represents the workflow for creating a
cluster
"""
flow = linear_flow.Flow("creating cluster %s" % cluster_id)
sub_flow = unordered_flow.Flow("create VMs")
for i in range(cluster_size):
sub_flow.add(create_cluster_node.create_cluster_node(cluster_id, i))
flow.add(sub_flow)
)
return flow

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
import cue.client as client
import os_tasklib.common as os_common
import os_tasklib.neutron as neutron
import os_tasklib.nova as nova
def create_cluster_node(cluster_id, node_number):
"""Create Cluster Node factory function
This factory function creates a flow for creating a node of a cluster.
:param cluster_id: Unique ID for the cluster that the node is part of.
:type cluster_id: string
:param node_number: Cluster node # for the node being created.
:type node_number: number
:return: A flow instance that represents the workflow for creating a
cluster node.
"""
flow_name = "create cluster %s node %d" % (cluster_id, node_number)
node_name = "cluster[%s].node[%d]" % (cluster_id, node_number)
extract_port_id = (lambda port_info:
[{'port-id': port_info['port']['id']}])
extract_vm_id = lambda vm_info: vm_info['id']
extract_vm_status = lambda vm_info: vm_info['status']
flow = linear_flow.Flow(flow_name)
flow.add(
neutron.CreatePort(
name="create port %s" % node_name,
os_client=client.neutron_client(),
inject={'port_name': node_name},
provides="port_info_%d" % node_number),
os_common.Lambda(
extract_port_id,
name="extract port id %s" % node_name,
rebind={'port_info': "port_info_%d" % node_number},
provides="port_id_%d" % node_number),
nova.CreateVm(
name="create vm %s" % node_name,
os_client=client.nova_client(),
requires=('name', 'image', 'flavor', 'nics'),
inject={'name': node_name},
rebind={'nics': "port_id_%d" % node_number},
provides="vm_info_%d" % node_number),
os_common.Lambda(
extract_vm_id,
name="extract vm id %s" % node_name,
rebind={'vm_info': "vm_info_%d" % node_number},
provides="vm_id_%d" % node_number),
linear_flow.Flow(name="wait for active state %s" % node_name,
retry=retry.Times(12)).add(
nova.GetVm(
os_client=client.nova_client(),
name="get vm %s" % node_name,
rebind={'server': "vm_id_%d" % node_number},
provides="vm_info_%d" % node_number),
os_common.Lambda(
extract_vm_status,
name="extract vm status %s" % node_name,
rebind={'vm_info': "vm_info_%d" % node_number},
provides="vm_status_%d" % node_number),
os_common.CheckFor(
name="check vm status %s" % node_name,
rebind={'check_var': "vm_status_%d" % node_number},
check_value='ACTIVE',
timeout_seconds=10),
),
)
return flow

View File

@ -14,9 +14,9 @@
# under the License.
import base_task as base_task
import taskflow.task as task
class UpdateClusterStatus(base_task.BaseTask):
class UpdateClusterStatus(task.Task):
def execute(self, cluster_status, **kwargs):
print("Update Cluster Status to %s" % cluster_status)

View File

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from cue import client
from cue.taskflow.flow import create_cluster
from cue.tests import base
from cue.tests.test_fixtures import neutron
from cue.tests.test_fixtures import nova
from taskflow import engines
import taskflow.exceptions as taskflow_exc
class CreateClusterTests(base.TestCase):
additional_fixtures = [
nova.NovaClient,
neutron.NeutronClient
]
def setUp(self):
super(CreateClusterTests, self).setUp()
image_name = "cirros-0.3.2-x86_64-uec-kernel"
flavor_name = "m1.tiny"
network_name = "test-network"
self.nova_client = client.nova_client()
self.neutron_client = client.neutron_client()
self.new_vm_name = uuid.uuid4().hex
self.new_vm_list = []
self.valid_image = self.nova_client.images.find(name=image_name)
self.valid_flavor = self.nova_client.flavors.find(name=flavor_name)
network_list = self.neutron_client.list_networks(name=network_name)
self.valid_network = network_list['networks'][0]
def test_create_cluster(self):
flow_store = {
'image': self.valid_image.id,
'flavor': self.valid_flavor.id,
'network_id': self.valid_network['id']
}
cluster_id = uuid.uuid4().hex
cluster_size = 3
flow = create_cluster(cluster_id, cluster_size)
result = engines.run(flow, store=flow_store)
for i in range(cluster_size):
self.assertEqual("ACTIVE", result["vm_status_%d" % i])
self.new_vm_list.append(result["vm_id_%d" % i])
def test_create_cluster_overlimit(self):
vm_list = self.nova_client.servers.list()
port_list = self.neutron_client.list_ports()
flow_store = {
'image': self.valid_image.id,
'flavor': self.valid_flavor.id,
'network_id': self.valid_network['id']
}
cluster_id = uuid.uuid4().hex
cluster_size = 10
flow = create_cluster(cluster_id, cluster_size)
self.assertRaises(taskflow_exc.WrappedFailure, engines.run,
flow, store=flow_store)
self.assertEqual(vm_list, self.nova_client.servers.list())
self.assertEqual(port_list, self.neutron_client.list_ports())
def tearDown(self):
for vm_id in self.new_vm_list:
self.nova_client.servers.delete(vm_id)
super(CreateClusterTests, self).tearDown()

View File

@ -68,6 +68,7 @@ class NeutronClient(base.BaseFixture):
v2_client.create_network = self.create_network
v2_client.list_ports = self.list_ports
v2_client.list_networks = self.list_networks
v2_client.delete_port = self.delete_port
def create_port(self, body=None):
"""Mock'd version of neutronclient...create_port().
@ -87,7 +88,7 @@ class NeutronClient(base.BaseFixture):
else:
body = {'port': {}}
port_id = uuid.uuid4()
port_id = uuid.uuid4().hex
body['port']['id'] = port_id
self._port_list[port_id] = body['port']
return body
@ -106,7 +107,7 @@ class NeutronClient(base.BaseFixture):
else:
body = {'network': {}}
network_id = uuid.uuid4()
network_id = uuid.uuid4().hex
body['network'].update({
'id': network_id,
'subnets': [],
@ -141,4 +142,13 @@ class NeutronClient(base.BaseFixture):
for network in self._network_list.values():
if network[property] == value:
return {'networks': [network]}
return {'networks': [network]}
def delete_port(self, port):
try:
port_id = port.id
except AttributeError:
port_id = port
if port_id in self._port_list:
self._port_list.pop(port_id)

View File

@ -16,24 +16,27 @@
import uuid
import novaclient.exceptions as nova_exc
import novaclient.v1_1.client as nova_client
import novaclient.v2.client as nova_client
import cue.client as client
import cue.tests.test_fixtures.base as base
class VmDetails(object):
def __init__(self, vm_id, name, flavor, image):
def __init__(self, vm_id, name, flavor, image, status=None):
self.id = vm_id
self.name = name
self.flavor = flavor
self.image = image
self.status = status if status else 'ACTIVE'
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'flavor': self.flavor,
'image': self.image,
'status': self.status
}
@ -73,19 +76,32 @@ class NovaClient(base.BaseFixture):
connection in the absence of a working Neutron API endpoint.
"""
def __init__(self, *args, **kwargs):
def __init__(self, image_list=None, flavor_list=None,
vm_limit=None, *args, **kwargs):
super(NovaClient, self).__init__(*args, **kwargs)
self._vm_list = {}
self._vm_list = dict()
self._image_list = dict()
self._flavor_list = dict()
image_detail = ImageDetails(name='cirros-0.3.2-x86_64-uec-kernel')
self._image_list = {
image_detail.id: image_detail
}
if not image_list:
image_list = ['cirros-0.3.2-x86_64-uec-kernel']
flavor_detail = FlavorDetails(name='m1.tiny')
self._flavor_list = {
flavor_detail.id: flavor_detail
}
if not flavor_list:
flavor_list = ['m1.tiny']
self._vm_limit = vm_limit if vm_limit else 3
for image in image_list:
image_detail = ImageDetails(name=image)
self._image_list.update({
image_detail.id: image_detail
})
for flavor in flavor_list:
flavor_detail = FlavorDetails(name=flavor)
self._flavor_list.update({
flavor_detail.id: flavor_detail
})
def setUp(self):
"""Set up test fixture and apply all method overrides."""
@ -108,6 +124,8 @@ class NovaClient(base.BaseFixture):
:return: An updated copy of the 'body' that was passed in, with other
information populated.
"""
if len(self._vm_list) >= self._vm_limit:
raise nova_exc.OverLimit(413)
try:
flavor_id = flavor.id
except AttributeError:
@ -119,10 +137,10 @@ class NovaClient(base.BaseFixture):
image_id = image
if not self._flavor_list.get(flavor_id):
raise nova_exc.BadRequest(404)
raise nova_exc.BadRequest(400)
if not self._image_list.get(image_id):
raise nova_exc.BadRequest(404)
raise nova_exc.BadRequest(400)
if nics is not None:
neutron_client = client.neutron_client()
@ -133,7 +151,9 @@ class NovaClient(base.BaseFixture):
if (not network_list or
not network_list.get('networks') or
len(network_list['networks']) == 0):
raise nova_exc.BadRequest(404)
raise nova_exc.BadRequest(400)
if nic.get('port-id'):
pass
newVm = VmDetails(vm_id=uuid.uuid4(), name=name,
flavor=flavor, image=image)

View File

@ -365,6 +365,12 @@ port=8795
max_limit=1000
#
# Openstack Image ID
# TODO(sputnik13): this needs to be removed when image selection is integrated
#
os_image_id=
[database]
#

View File

@ -46,7 +46,5 @@ class CheckFor(task.Task):
(self.check_value, check_var))
def revert(self, check_var, *args, **kwargs):
print("Check failed, expected %s, got %s" %
(self.check_value, check_var))
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@ -15,6 +15,12 @@
import os_tasklib
from cue.common.i18n import _LW # noqa
from cue.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class CreatePort(os_tasklib.BaseTask):
"""CreatePort Task
@ -48,7 +54,30 @@ class CreatePort(os_tasklib.BaseTask):
return port
def revert(self, **kwargs):
"""Revert function for a failed create port task."""
# TODO(dagnello): no action required for revert of a failed port create
# task, but logging should be added with a flow transaction ID which
# will provide context and state to the error.
"""Revert CreatePort Task
This method is executed upon failure of the CreatePort Task or the Flow
that the Task is part of.
:param args: positional arguments that the task required to execute.
:type args: list
:param kwargs: keyword arguments that the task required to execute; the
special key `result` will contain the :meth:`execute`
results (if any) and the special key `flow_failures`
will contain any failure information.
"""
if kwargs.get('tx_id'):
LOG.warning(_LW("%(tx_id)s Create Port failed %(result)s") %
{'tx_id': kwargs['tx_id'],
'result': kwargs['flow_failures']})
else:
LOG.warning(_LW("Create Port failed %s") % kwargs['flow_failures'])
port_info = kwargs.get('result')
if port_info and isinstance(port_info, dict):
try:
port_id = port_info['port']['id']
if port_id:
self.os_client.delete_port(port=port_id)
except KeyError:
pass

View File

@ -119,9 +119,9 @@ class CreateVm(os_tasklib.BaseTask):
return new_vm.to_dict()
def revert(self, *args, **kwargs):
"""Revert CreateVmTask
"""Revert CreateVm Task
This method is executed upon failure of the CreateVmTask or the Flow
This method is executed upon failure of the CreateVm Task or the Flow
that the Task is part of.
:param args: positional arguments that the task required to execute.
@ -141,6 +141,9 @@ class CreateVm(os_tasklib.BaseTask):
vm_info = kwargs.get('result')
if vm_info and isinstance(vm_info, dict):
vm_id = vm_info['id']
if vm_id:
self.os_client.servers.delete()
try:
vm_id = vm_info['id']
if vm_id:
self.os_client.servers.delete(vm_id)
except KeyError:
pass