Adding anti-affinity for broker cluster nodes

* Also cleanup in create cluster flow constructions since this patch
updates that file.

Change-Id: Idc4436078b905425c3c91ef0b3ec950d08797410
This commit is contained in:
Abitha Palaniappan 2015-11-11 15:14:15 -08:00 committed by Davide Agnello
parent bbcfbc0139
commit ea1462af37
18 changed files with 588 additions and 53 deletions

View File

@ -178,14 +178,18 @@ class ClusterController(rest.RestController):
# create list with node id's for create cluster flow
node_ids = [node.id for node in nodes]
# retrieve cluster record
cluster = objects.Cluster.get_cluster_by_id(context, cluster_id)
# prepare and post cluster delete job to backend
flow_kwargs = {
'cluster_id': cluster_id,
'node_ids': node_ids,
'group_id': cluster.group_id,
}
job_args = {
"context": context.to_dict(),
'context': context.to_dict(),
}
job_client = task_flow_client.get_client_instance()

View File

@ -14,7 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add error_detail column in clusters
"""add error_detail and group_id column in clusters
Revision ID: 17c428e0479e
Revises: 244aa473e595
@ -26,16 +26,22 @@ Create Date: 2015-11-11 12:01:10.769280
revision = '17c428e0479e'
down_revision = '244aa473e595'
from cue.db.sqlalchemy import types
from alembic import op
from oslo_config import cfg
import sqlalchemy as sa
def upgrade():
op.add_column('clusters', sa.Column('error_detail', sa.Text))
op.add_column('clusters', sa.Column('error_detail', sa.Text(),
nullable=True))
op.add_column('clusters', sa.Column('group_id', types.UUID(),
nullable=True))
def downgrade():
db_connection = cfg.CONF.database.connection
if db_connection != "sqlite://": # pragma: nocover
op.drop_column('clusters', 'error_detail')
op.drop_column('clusters', 'error_detail')
op.drop_column('clusters', 'group_id')

View File

@ -74,6 +74,7 @@ class Cluster(base.BASE, base.IdMixin, models.TimestampMixin,
size = sa.Column(sa.Integer(), default=1, nullable=False)
volume_size = sa.Column(sa.Integer(), nullable=True)
error_detail = sa.Column(sa.Text(), nullable=True)
group_id = sa.Column(types.UUID(), nullable=True)
sa.Index("clusters_cluster_id_idx", "cluster_id", unique=True)

View File

@ -39,6 +39,7 @@ class Cluster(base.CueObject):
'updated_at': obj_utils.datetime_or_str_or_none,
'deleted_at': obj_utils.datetime_or_str_or_none,
'error_detail': obj_utils.str_or_none,
'group_id': obj_utils.str_or_none,
}
@staticmethod

View File

@ -59,6 +59,10 @@ TF_OPTS = [
help="Number of times to check a node for status before "
"declaring it FAULTED",
default=30),
cfg.BoolOpt('cluster_node_anti_affinity',
help="Anti-affinity policy for cue cluster nodes",
default=False),
]
opt_group = cfg.OptGroup(

View File

@ -18,10 +18,12 @@ import taskflow.patterns.graph_flow as graph_flow
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
import cue.client as client
from cue.db.sqlalchemy import models
from cue.taskflow.flow import create_cluster_node
import cue.taskflow.task as cue_tasks
import os_tasklib.common as os_common
import os_tasklib.nova as nova
def create_cluster(cluster_id, node_ids, user_network_id,
@ -42,63 +44,102 @@ def create_cluster(cluster_id, node_ids, user_network_id,
:return: A flow instance that represents the workflow for creating a
cluster
"""
cluster_name = "cue[%s]" % cluster_id
flow = graph_flow.Flow("creating cluster %s" % cluster_id)
start_flow_status = {'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.BUILDING}}
end_flow_status = {'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.ACTIVE}}
start_flow_cluster_update = {
'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.BUILDING}}
start_task = cue_tasks.UpdateClusterRecord(
name="update cluster status start "
"%s" % cluster_id,
inject=start_flow_status)
flow.add(start_task)
extract_scheduler_hints = lambda vm_group: {'group': str(vm_group['id'])}
end_flow_cluster_update = lambda vm_group: {
'status': models.Status.ACTIVE,
'group_id': str(vm_group['id'])}
end_task = cue_tasks.UpdateClusterRecord(
name="update cluster status end "
"%s" % cluster_id,
inject=end_flow_status)
flow.add(end_task)
create_cluster_start_task = cue_tasks.UpdateClusterRecord(
name="update cluster status start %s" % cluster_name,
inject=start_flow_cluster_update)
flow.add(create_cluster_start_task)
cluster_anti_affinity = cfg.CONF.taskflow.cluster_node_anti_affinity
if cluster_anti_affinity:
create_vm_group = nova.CreateVmGroup(
name="create cluster group %s" % cluster_name,
os_client=client.nova_client(),
requires=('name', 'policies'),
inject={'name': "cue_group_%s" % cluster_id,
'policies': ['anti-affinity']},
provides="cluster_group")
flow.add(create_vm_group)
get_scheduler_hints = os_common.Lambda(
extract_scheduler_hints,
name="extract scheduler hints %s" % cluster_name,
rebind={'vm_group': "cluster_group"},
provides="scheduler_hints")
flow.add(get_scheduler_hints)
build_cluster_info = os_common.Lambda(
end_flow_cluster_update,
name="build new cluster update values %s" % cluster_name,
rebind={'vm_group': "cluster_group"},
provides="cluster_values")
flow.add(build_cluster_info)
flow.link(create_cluster_start_task, create_vm_group)
flow.link(create_vm_group, get_scheduler_hints)
flow.link(get_scheduler_hints, build_cluster_info)
create_node_start_task = build_cluster_info
create_cluster_end_task = cue_tasks.UpdateClusterRecord(
name="update cluster status end %s" % cluster_name,
inject={'cluster_id': cluster_id})
else:
create_node_start_task = create_cluster_start_task
end_flow_cluster_update = {
'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.ACTIVE}}
create_cluster_end_task = cue_tasks.UpdateClusterRecord(
name="update cluster status end %s" % cluster_name,
inject=end_flow_cluster_update)
flow.add(create_cluster_end_task)
node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout
node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count
check_rabbit_online = linear_flow.Flow(
name="wait for RabbitMQ ready state",
name="wait for RabbitMQ ready state %s" % cluster_name,
retry=retry.Times(node_check_max_count, revert_all=True))
check_rabbit_online.add(
cue_tasks.GetRabbitClusterStatus(
name="get RabbitMQ status",
name="get RabbitMQ status %s" % cluster_name,
rebind={'vm_ip': "vm_management_ip_0"},
provides="clustering_status",
inject={'proto': 'http'}),
os_common.CheckFor(
name="check cluster status",
name="check cluster status %s" % cluster_name,
details="waiting for RabbitMQ clustered status",
rebind={'check_var': "clustering_status"},
check_value='OK',
retry_delay_seconds=10),
retry_delay_seconds=node_check_timeout),
)
flow.add(check_rabbit_online)
flow.link(check_rabbit_online, end_task)
flow.link(check_rabbit_online, create_cluster_end_task)
#todo(dagnello): verify node_ids is a list and not a string
for i, node_id in enumerate(node_ids):
generate_userdata = cue_tasks.ClusterNodeUserData(
"userdata_%d" % i,
len(node_ids),
"vm_management_ip_",
name="generate userdata %s_%d" % (cluster_name, i),
node_count=len(node_ids),
node_ip_prefix="vm_management_ip_",
inject={'node_name': "rabbit-node-%d" % i,
'cluster_id': cluster_id})
flow.add(generate_userdata)
create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow,
generate_userdata, start_task,
check_rabbit_online,
node_check_timeout,
node_check_max_count,
user_network_id,
management_network_id)
create_cluster_node.create_cluster_node(
cluster_id=cluster_id, node_number=i, node_id=node_id,
graph_flow=flow, generate_userdata=generate_userdata,
start_task=create_node_start_task, post_task=check_rabbit_online,
user_network_id=user_network_id,
management_network_id=management_network_id)
return flow

View File

@ -37,7 +37,6 @@ CONF.register_opts(FLOW_OPTS, group='flow_options')
def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
generate_userdata, start_task, post_task,
node_check_timeout, node_check_max_count,
user_network_id, management_network_id):
"""Create Cluster Node factory function
@ -57,10 +56,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
:type post_task: taskflow task or flow
:param generate_userdata: generate user data task
:type generate_userdata: cue.taskflow.task.ClusterNodeUserData
:param node_check_timeout: seconds wait between node status checks
:type node_check_timeout: int
:param node_check_max_count: times to check for updated node status
:type node_check_max_count: int
:param user_network_id: The user's network id
:type user_network_id: string
:param management_network_id: The management network id
@ -141,7 +136,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
graph_flow.link(create_vm, get_vm_id)
retry_count = CONF.flow_options.create_cluster_node_vm_active_retry_count
#todo(dagnello): make retry times configurable
check_vm_active = linear_flow.Flow(
name="wait for VM active state %s" % node_name,
retry=retry.Times(retry_count, revert_all=True))

View File

@ -16,12 +16,14 @@
import taskflow.patterns.linear_flow as linear_flow
import taskflow.patterns.unordered_flow as unordered_flow
import cue.client as client
from cue.db.sqlalchemy import models
from cue.taskflow.flow import delete_cluster_node
import cue.taskflow.task as cue_tasks
import os_tasklib.nova as nova
def delete_cluster(cluster_id, node_ids):
def delete_cluster(cluster_id, node_ids, group_id):
"""Delete Cluster flow factory function
This factory function uses :func:`cue.taskflow.flow.delete_cluster_node` to
@ -31,9 +33,12 @@ def delete_cluster(cluster_id, node_ids):
:type cluster_id: string
:param node_ids: The Cue Node id's associated with each node in the cluster
:type node_ids: list of uuid's
:param group_id: The group id associated with the cluster
:type group_id: uuid
:return: A flow instance that represents the workflow for deleting a
cluster
"""
cluster_name = "cue[%s]" % cluster_id
flow = linear_flow.Flow("deleting cluster %s" % cluster_id)
sub_flow = unordered_flow.Flow("delete VMs")
start_flow_status = {'cluster_id': cluster_id,
@ -48,11 +53,16 @@ def delete_cluster(cluster_id, node_ids):
node_id))
flow.add(cue_tasks.UpdateClusterRecord(name="update cluster status start "
"%s" % cluster_id,
inject=start_flow_status))
"%s" % cluster_name,
inject=start_flow_status))
if group_id is not None:
flow.add(nova.DeleteVmGroup(name="delete nova user group %s %s" %
(group_id, cluster_name),
os_client=client.nova_client(),
inject={'group': group_id}))
flow.add(sub_flow)
flow.add(cue_tasks.UpdateClusterRecord(name="update cluster status end "
"%s" % cluster_id,
inject=end_flow_status))
"%s" % cluster_name,
inject=end_flow_status))
return flow

View File

@ -26,6 +26,7 @@ from oslo_utils import timeutils
UUID1 = str(uuid.uuid4())
UUID2 = str(uuid.uuid4())
UUID3 = str(uuid.uuid4())
UUID4 = str(uuid.uuid4())
class ModelsTests(base.FunctionalTestCase):
@ -47,6 +48,7 @@ class ModelsTests(base.FunctionalTestCase):
"updated_at": timeutils.utcnow(),
"deleted_at": timeutils.utcnow(),
"error_detail": "My cluster's error(s) detail",
"group_id": UUID4,
}
cluster = models.Cluster()
@ -79,6 +81,9 @@ class ModelsTests(base.FunctionalTestCase):
self.assertEqual(cluster_values["error_detail"],
cluster.error_detail,
"Invalid error_detail value")
self.assertEqual(cluster_values["group_id"],
cluster.group_id,
"Invalid group_id value")
db_session = sql_api.get_session()
cluster.save(db_session)
@ -114,6 +119,9 @@ class ModelsTests(base.FunctionalTestCase):
self.assertEqual(cluster_values["error_detail"],
cluster_db.error_detail,
"Invalid error_detail value")
self.assertEqual(cluster_values["group_id"],
cluster_db.group_id,
"Invalid group_id value")
def test_create_node_model(self):
"""Verifies a new cluster record is created in DB."""

View File

@ -24,13 +24,14 @@ import cue.tests.functional.fixtures.base as base
class VmDetails(object):
def __init__(self, vm_id, name, flavor, image,
port_list=None, status=None):
port_list=None, status=None, host_id=None):
self.id = vm_id
self.name = name
self.flavor = flavor
self.image = image
self.status = status if status else 'ACTIVE'
self.port_list = port_list
self.host_id = host_id
def to_dict(self):
return {
@ -105,6 +106,20 @@ class VmStatusDetails(object):
return status
class VmGroupDetails(object):
def __init__(self, vm_group_id, name, policies=None):
self.id = vm_group_id or str(uuid.uuid4())
self.name = name or 'cue_group'
self.policies = policies or ['anti-affinity']
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'policies': self.policies
}
class NovaClient(base.BaseFixture):
"""A test fixture to simulate a Nova Client connection
@ -114,11 +129,12 @@ class NovaClient(base.BaseFixture):
def __init__(self, image_list=None, flavor_list=None,
vm_limit=None, security_group_list=None,
*args, **kwargs):
vm_group_list=None, *args, **kwargs):
super(NovaClient, self).__init__(*args, **kwargs)
self._vm_list = dict()
self._image_list = dict()
self._flavor_list = dict()
self._vm_group_list = dict()
if not image_list:
image_list = ['cirros-0.3.2-x86_64-uec-kernel']
@ -158,9 +174,12 @@ class NovaClient(base.BaseFixture):
v2_client.images.find = self.find_images
v2_client.images.list = self.list_images
v2_client.flavors.find = self.find_flavors
v2_client.server_groups.create = self.create_vm_group
v2_client.server_groups.delete = self.delete_vm_group
v2_client.server_groups.get = self.get_vm_group
def create_vm(self, name, image, flavor, nics=None, security_groups=None,
**kwargs):
scheduler_hints=None, **kwargs):
"""Mock'd version of novaclient...create_vm().
Create a Nova VM.
@ -225,6 +244,17 @@ class NovaClient(base.BaseFixture):
port_list=port_list,
status='BUILDING')
if scheduler_hints is not None:
try:
group_id = scheduler_hints['group']
except AttributeError:
group_id = scheduler_hints
if group_id not in self._vm_group_list:
raise nova_exc.BadRequest(400)
newVm.host_id = str(uuid.uuid4())
self._vm_list[str(newVm.id)] = newVm
return newVm
@ -330,3 +360,42 @@ class NovaClient(base.BaseFixture):
raise nova_exc.NotFound(404)
return server.port_list
def create_vm_group(self, name, policies, **kwargs):
"""Mock'd version of novaclient...server_group_create().
Create a Nova server group.
:param name: Server group name.
:param policies: Server group policy list.
:return: An updated copy of the 'body' that was passed in, with other
information populated.
"""
newVmGroup = VmGroupDetails(vm_group_id=str(uuid.uuid4()), name=name,
policies=policies)
self._vm_group_list[newVmGroup.id] = newVmGroup
return newVmGroup
def get_vm_group(self, id, **kwargs):
"""Mock'd version of novaclient...server_group_get()
:param id: vm server group id
:return: current server group object for specified vm id
"""
try:
vm_group = self._vm_group_list[str(id)]
except KeyError:
raise nova_exc.NotFound(404)
return vm_group
def delete_vm_group(self, id, **kwargs):
"""Mock'd version of novaclient...server_group_delete()
:param id: vm server group id
"""
try:
del (self._vm_group_list[str(id)])
except KeyError:
raise nova_exc.NotFound(404)

View File

@ -240,6 +240,54 @@ class CreateClusterTests(base.FunctionalTestCase):
else:
self.fail("Expected taskflow_exc.WrappedFailure exception.")
def test_create_cluster_anti_affinity(self):
self.flags(cluster_node_anti_affinity=True, group="taskflow")
flow_store = {
'image': self.valid_image.id,
'flavor': self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": str(uuid.uuid4()),
"flavor": "1",
"size": 3,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
node_ids = []
for node in nodes:
node_ids.append(node.id)
flow = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
engines.run(flow, store=flow_store)
nodes_after = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
# check if the host_ids are different for cluster nodes
host_ids = []
for node in nodes_after:
host_id = self.nova_client.servers.get(node.instance_id).host_id
self.assertNotIn(host_id, host_ids)
host_ids.append(host_id)
def tearDown(self):
for vm_id in self.new_vm_list:
self.nova_client.servers.delete(vm_id)

View File

@ -26,6 +26,7 @@ from cue.tests.functional.fixtures import neutron
from cue.tests.functional.fixtures import nova
from cue.tests.functional.fixtures import urllib2_fixture
import novaclient.exceptions as nova_exc
from taskflow import engines
import taskflow.exceptions as taskflow_exc
@ -114,10 +115,9 @@ class DeleteClusterTests(base.FunctionalTestCase):
node_ids.append(str(node.id))
flow_create = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
flow_delete = delete_cluster(str(new_cluster.id), node_ids)
node_ids,
self.valid_network['id'],
self.management_network['id'])
result = engines.run(flow_create, store=flow_store_create)
@ -148,6 +148,8 @@ class DeleteClusterTests(base.FunctionalTestCase):
self.assertEqual(uri, endpoint.uri, "invalid endpoint uri")
self.assertEqual('AMQP', endpoint.type, "invalid endpoint type")
flow_delete = delete_cluster(str(new_cluster.id), node_ids,
cluster_after.group_id)
result = engines.run(flow_delete, store=flow_store_delete)
nodes_after = objects.Node.get_nodes_by_cluster_id(self.context,
@ -166,6 +168,67 @@ class DeleteClusterTests(base.FunctionalTestCase):
node.id)
self.assertEqual(0, len(endpoints), "endpoints were not deleted")
def test_delete_cluster_anti_affinity(self):
self.flags(cluster_node_anti_affinity=True, group="taskflow")
flow_store_create = {
"image": self.valid_image.id,
"flavor": self.valid_flavor.id,
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
"default_rabbit_user": 'rabbit',
"default_rabbit_pass": str(uuid.uuid4()),
}
flow_store_delete = {
"context": self.context.to_dict(),
}
cluster_values = {
"project_id": self.context.tenant_id,
"name": "RabbitCluster",
"network_id": str(uuid.uuid4()),
"flavor": "1",
"size": 3,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(self.context)
nodes = objects.Node.get_nodes_by_cluster_id(self.context,
new_cluster.id)
node_ids = []
for node in nodes:
node_ids.append(str(node.id))
flow_create = create_cluster(new_cluster.id,
node_ids,
self.valid_network['id'],
self.management_network['id'])
engines.run(flow_create, store=flow_store_create)
cluster_after = objects.Cluster.get_cluster_by_id(self.context,
new_cluster.id)
self.assertEqual(models.Status.ACTIVE, cluster_after.status,
"Invalid status for cluster")
flow_delete = delete_cluster(str(new_cluster.id), node_ids,
cluster_after.group_id)
engines.run(flow_delete, store=flow_store_delete)
self.assertRaises(exception.NotFound,
objects.Cluster.get_cluster_by_id,
self.context,
new_cluster.id)
# verify server group is not found
self.assertRaises(nova_exc.NotFound,
self.nova_client.server_groups.get,
cluster_after.group_id)
def test_delete_invalid_cluster(self):
vm_list = self.nova_client.servers.list()
port_list = self.neutron_client.list_ports()
@ -176,11 +239,12 @@ class DeleteClusterTests(base.FunctionalTestCase):
cluster_size = 3
cluster_id = str(uuid.uuid4())
server_group_id = str(uuid.uuid4())
node_ids = []
for i in range(0, cluster_size):
node_ids.append(str(uuid.uuid4()))
flow_delete = delete_cluster(cluster_id, node_ids)
flow_delete = delete_cluster(cluster_id, node_ids, server_group_id)
self.assertRaises(taskflow_exc.WrappedFailure, engines.run,
flow_delete, store=flow_store_delete)

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from cue import client
from cue.tests.functional import base
from cue.tests.functional.fixtures import nova
import os_tasklib.nova.create_vm_group as create_vm_group
from taskflow import engines
from taskflow.patterns import linear_flow
class CreateVmGroupTests(base.FunctionalTestCase):
additional_fixtures = [
nova.NovaClient
]
def setUp(self):
super(CreateVmGroupTests, self).setUp()
self.nova_client = client.nova_client()
self.new_vm_group_name = str(uuid.uuid4())
self.new_vm_group_id = None
self.flow = linear_flow.Flow("create vm group flow")
self.flow.add(
create_vm_group.CreateVmGroup(
os_client=self.nova_client,
requires=('name', 'policies'),
provides='new_vm_group',
rebind={'name': 'vm_group_name'}
)
)
def test_create_vm_group(self):
"""Verifies CreateVMGroup task directly."""
flow_store = {
'vm_group_name': self.new_vm_group_name,
'policies': ['anti-affinity']
}
result = engines.run(self.flow, store=flow_store)
self.new_vm_group_id = result['new_vm_group']['id']
vm_group = self.nova_client.server_groups.get(self.new_vm_group_id)
self.assertEqual(self.new_vm_group_name, vm_group.name)

View File

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from cue import client
from cue.tests.functional import base
from cue.tests.functional.fixtures import nova
import os_tasklib.nova.delete_vm_group as delete_vm_group
import novaclient.exceptions as nova_exc
from taskflow import engines
from taskflow.patterns import linear_flow
class DeleteVmGroupTests(base.FunctionalTestCase):
additional_fixtures = [
nova.NovaClient
]
task_store = {
'group': "0",
}
def setUp(self):
super(DeleteVmGroupTests, self).setUp()
self.nova_client = client.nova_client()
self.new_vm_group_name = str(uuid.uuid4())
self.new_vm_group_id = None
self.flow = linear_flow.Flow("delete vm group flow")
self.flow.add(
delete_vm_group.DeleteVmGroup(os_client=self.nova_client)
)
def test_delete_server_group_invalid_id(self):
"""Verifies Delete non-existing server group."""
# create a few server groups
server_groups = [
self.nova_client.server_groups.create(name="server_group_1",
policies=['anti-affinity']),
self.nova_client.server_groups.create(name="server_group_2",
policies=['affinity']),
self.nova_client.server_groups.create(name="server_group_3",
policies=['anti-affinity'])]
self.task_store['group'] = str(uuid.uuid4())
# start engine to run delete task
engines.run(self.flow, store=DeleteVmGroupTests.task_store)
# retrieve all server groups
server_groups_found = []
for server_group in server_groups:
server_groups_found.append(
self.nova_client.server_groups.get(server_group.id))
# verify the number of server groups did not change
self.assertEqual(len(server_groups), len(server_groups_found),
"Not all server groups were found")
def test_delete_server_group(self):
"""Verifies Delete existing server group."""
# create a few server groups
server_groups = [
self.nova_client.server_groups.create(name="server_group_1",
policies=['anti-affinity']),
self.nova_client.server_groups.create(name="server_group_2",
policies=['anti-affinity']),
self.nova_client.server_groups.create(name="server_group_3",
policies=['affinity'])]
# select a server group to delete
self.task_store['group'] = server_groups[0].id
# start engine to run delete server group task
engines.run(self.flow, store=DeleteVmGroupTests.task_store)
# verify deleted server group is not found
self.assertRaises(nova_exc.NotFound,
self.nova_client.server_groups.get,
server_groups[0].id)

View File

@ -76,6 +76,7 @@ jobboard_name 'cue' Board name
engine_type 'serial' Engine type
cluster_node_check_timeout 10 Number of seconds between node status checks
cluster_node_check_max_count 30 Number of times to check a node for status
cluster_node_anti_affinity False Anti-affinity policy for cue cluster nodes
============================= ==================================== ==============================================================
[openstack]

View File

@ -14,7 +14,9 @@
# under the License.
from os_tasklib.nova.create_vm import CreateVm # noqa
from os_tasklib.nova.create_vm_group import CreateVmGroup # noqa
from os_tasklib.nova.delete_vm import DeleteVm # noqa
from os_tasklib.nova.delete_vm_group import DeleteVmGroup # noqa
from os_tasklib.nova.get_vm import GetVm # noqa
from os_tasklib.nova.get_vm_status import GetVmStatus # noqa
from os_tasklib.nova.list_vm_interfaces import ListVmInterfaces # noqa

View File

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os_tasklib
from cue.common.i18n import _LW # noqa
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class CreateVmGroup(os_tasklib.BaseTask):
"""CreateVmGroup Task
This task interfaces with Nova API and creates a VM group based on
parameters provided to the Task.
"""
def execute(self, name, policies):
"""Main execute method
:param name: Name of the server group
:type name: string
:param policies: policy is either affinity or anti-affinity
:type policies: list
:rtype: dict
"""
new_vm_group = self.os_client.server_groups.create(
name=name,
policies=policies
)
return new_vm_group.to_dict()
def revert(self, *args, **kwargs):
"""Revert CreateVmGroup Task
This method is executed upon failure of the CreateVmGroup Task or the
Flow that the Task is part of.
:param args: positional arguments that the task required to execute.
:type args: list
:param kwargs: keyword arguments that the task required to execute; the
special key `result` will contain the :meth:`execute`
results (if any) and the special key `flow_failures`
will contain any failure information.
"""
if kwargs.get('tx_id'):
LOG.warning(_LW("%(tx_id)s Create VM Group failed %(result)s") %
{'tx_id': kwargs['tx_id'],
'result': kwargs['flow_failures']})
else:
LOG.warning(_LW("Create VM Group failed %s") %
kwargs['flow_failures'])
vm_group_info = kwargs.get('result')
if vm_group_info and isinstance(vm_group_info, dict):
try:
vm_group_id = vm_group_info['id']
if vm_group_id:
self.os_client.server_groups.delete(vm_group_id)
except KeyError:
pass

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os_tasklib
from cue.common.i18n import _LW # noqa
import novaclient.exceptions as nova_exc
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class DeleteVmGroup(os_tasklib.BaseTask):
"""DeleteVmGroup Task
This task interfaces with Nova API and deletes a VM server group.
"""
def execute(self, group):
"""Main execute method
:param name: ID of the server group to delete
:type name: string
:return: n/a
"""
try:
self.os_client.server_groups.delete(id=group)
except nova_exc.NotFound:
LOG.warning(_LW("Server group was not found %s") % group)