Drop direct engine support
This change remove support of direct engine in Sahara. Unit tests related to direct engine were removed. Implements blueprint: remove-direct-engine Change-Id: Iba292996dd052a069a98f1024441255a15c0f734
This commit is contained in:
parent
a670d330c0
commit
76b310414c
|
@ -41,19 +41,22 @@ LOG = log.getLogger(__name__)
|
|||
opts = [
|
||||
cfg.StrOpt('os_region_name',
|
||||
help='Region name used to get services endpoints.'),
|
||||
cfg.StrOpt('infrastructure_engine',
|
||||
default='heat',
|
||||
help='An engine which will be used to provision '
|
||||
'infrastructure for Hadoop cluster.'),
|
||||
cfg.StrOpt('remote',
|
||||
default='ssh',
|
||||
help='A method for Sahara to execute commands '
|
||||
'on VMs.'),
|
||||
cfg.IntOpt('api_workers', default=1,
|
||||
help="Number of workers for Sahara API service (0 means "
|
||||
"all-in-one-thread configuration).")
|
||||
"all-in-one-thread configuration)."),
|
||||
# TODO(vgridnev): Remove in N release
|
||||
cfg.StrOpt('infrastructure_engine',
|
||||
default='heat',
|
||||
help='An engine which will be used to provision '
|
||||
'infrastructure for Hadoop cluster.',
|
||||
deprecated_for_removal=True),
|
||||
]
|
||||
|
||||
INFRASTRUCTURE_ENGINE = 'heat'
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(opts)
|
||||
|
||||
|
@ -127,17 +130,13 @@ def _load_driver(namespace, name):
|
|||
|
||||
def _get_infrastructure_engine():
|
||||
"""Import and return one of sahara.service.*_engine.py modules."""
|
||||
|
||||
LOG.debug("Infrastructure engine {engine} is loading".format(
|
||||
if CONF.infrastructure_engine != "heat":
|
||||
LOG.warning(_LW("Engine {engine} is not supported. Loading Heat "
|
||||
"infrastructure engine instead.").format(
|
||||
engine=CONF.infrastructure_engine))
|
||||
|
||||
if CONF.infrastructure_engine == "direct":
|
||||
LOG.warning(_LW("Direct infrastructure engine is deprecated in Liberty"
|
||||
" release and will be removed after that release."
|
||||
" Use Heat infrastructure engine instead."))
|
||||
|
||||
return _load_driver('sahara.infrastructure.engine',
|
||||
CONF.infrastructure_engine)
|
||||
LOG.debug("Infrastructure engine {engine} is loading".format(
|
||||
engine=INFRASTRUCTURE_ENGINE))
|
||||
return _load_driver('sahara.infrastructure.engine', INFRASTRUCTURE_ENGINE)
|
||||
|
||||
|
||||
def _get_remote_driver():
|
||||
|
|
|
@ -1,490 +0,0 @@
|
|||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara import exceptions as exc
|
||||
from sahara.i18n import _
|
||||
from sahara.i18n import _LI
|
||||
from sahara.i18n import _LW
|
||||
from sahara.service import engine as e
|
||||
from sahara.service import networks
|
||||
from sahara.service import volumes
|
||||
from sahara.utils import cluster as c_u
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import general as g
|
||||
from sahara.utils.openstack import neutron
|
||||
from sahara.utils.openstack import nova
|
||||
from sahara.utils import poll_utils
|
||||
|
||||
conductor = c.API
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
SSH_PORT = 22
|
||||
|
||||
|
||||
def _warning_logger():
|
||||
LOG.warning(_LW("Direct infrastructure engine is deprecated in Liberty"
|
||||
" release and will be removed after that release."
|
||||
" Use Heat infrastructure engine instead."))
|
||||
|
||||
|
||||
class DirectEngine(e.Engine):
|
||||
def get_type_and_version(self):
|
||||
return "direct.1.0"
|
||||
|
||||
def create_cluster(self, cluster):
|
||||
_warning_logger()
|
||||
ctx = context.ctx()
|
||||
self._update_rollback_strategy(cluster, shutdown=True)
|
||||
|
||||
# create all instances
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_SPAWNING)
|
||||
self._create_instances(cluster)
|
||||
|
||||
# wait for all instances are up and networks ready
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_WAITING)
|
||||
instances = c_u.get_instances(cluster)
|
||||
|
||||
self._await_active(cluster, instances)
|
||||
|
||||
self._assign_floating_ips(instances)
|
||||
|
||||
self._await_networks(cluster, instances)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
||||
# attach volumes
|
||||
volumes.attach_to_instances(c_u.get_instances(cluster))
|
||||
|
||||
# prepare all instances
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_PREPARING)
|
||||
|
||||
self._configure_instances(cluster)
|
||||
|
||||
self._update_rollback_strategy(cluster)
|
||||
|
||||
def scale_cluster(self, cluster, node_group_id_map):
|
||||
_warning_logger()
|
||||
ctx = context.ctx()
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_SCALING_SPAWNING)
|
||||
|
||||
instance_ids = self._scale_cluster_instances(cluster,
|
||||
node_group_id_map)
|
||||
|
||||
self._update_rollback_strategy(cluster, instance_ids=instance_ids)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
c_u.clean_cluster_from_empty_ng(cluster)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
instances = c_u.get_instances(cluster, instance_ids)
|
||||
|
||||
self._await_active(cluster, instances)
|
||||
|
||||
self._assign_floating_ips(instances)
|
||||
|
||||
self._await_networks(cluster, instances)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
||||
volumes.attach_to_instances(
|
||||
c_u.get_instances(cluster, instance_ids))
|
||||
|
||||
# we should be here with valid cluster: if instances creation
|
||||
# was not successful all extra-instances will be removed above
|
||||
if instance_ids:
|
||||
self._configure_instances(cluster)
|
||||
|
||||
self._update_rollback_strategy(cluster)
|
||||
|
||||
return instance_ids
|
||||
|
||||
def rollback_cluster(self, cluster, reason):
|
||||
_warning_logger()
|
||||
rollback_info = cluster.rollback_info or {}
|
||||
self._update_rollback_strategy(cluster)
|
||||
|
||||
if rollback_info.get('shutdown', False):
|
||||
self._rollback_cluster_creation(cluster, reason)
|
||||
LOG.warning(_LW("Cluster creation rollback "
|
||||
"(reason: {reason})").format(reason=reason))
|
||||
return False
|
||||
|
||||
instance_ids = rollback_info.get('instance_ids', [])
|
||||
if instance_ids:
|
||||
self._rollback_cluster_scaling(
|
||||
cluster,
|
||||
c_u.get_instances(cluster, instance_ids), reason)
|
||||
LOG.warning(_LW("Cluster scaling rollback "
|
||||
"(reason: {reason})").format(reason=reason))
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _update_rollback_strategy(self, cluster, shutdown=False,
|
||||
instance_ids=None):
|
||||
rollback_info = {}
|
||||
if shutdown:
|
||||
rollback_info['shutdown'] = shutdown
|
||||
|
||||
if instance_ids:
|
||||
rollback_info['instance_ids'] = instance_ids
|
||||
|
||||
cluster = conductor.cluster_update(
|
||||
context.ctx(), cluster, {'rollback_info': rollback_info})
|
||||
return cluster
|
||||
|
||||
# TODO(alazarev) remove when we fully switch to server groups
|
||||
def _generate_anti_affinity_groups(self, cluster):
|
||||
aa_groups = {}
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
for instance in node_group.instances:
|
||||
if instance.instance_id:
|
||||
for process in node_group.node_processes:
|
||||
if process in cluster.anti_affinity:
|
||||
aa_group = aa_groups.get(process, [])
|
||||
aa_group.append(instance.instance_id)
|
||||
aa_groups[process] = aa_group
|
||||
|
||||
return aa_groups
|
||||
|
||||
def _create_instances(self, cluster):
|
||||
ctx = context.ctx()
|
||||
|
||||
cluster = self._create_auto_security_groups(cluster)
|
||||
|
||||
aa_group = None
|
||||
if cluster.anti_affinity:
|
||||
aa_group = self._create_aa_server_group(cluster)
|
||||
cpo.add_provisioning_step(
|
||||
cluster.id, _("Run instances"),
|
||||
c_u.count_instances(cluster))
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
count = node_group.count
|
||||
conductor.node_group_update(ctx, node_group, {'count': 0})
|
||||
for idx in six.moves.xrange(1, count + 1):
|
||||
self._start_instance(
|
||||
cluster, node_group, idx, aa_group=aa_group)
|
||||
|
||||
def _create_aa_server_group(self, cluster):
|
||||
server_group_name = g.generate_aa_group_name(cluster.name)
|
||||
client = nova.client().server_groups
|
||||
|
||||
if client.findall(name=server_group_name):
|
||||
raise exc.InvalidDataException(
|
||||
_("Server group with name %s is already exists")
|
||||
% server_group_name)
|
||||
|
||||
server_group = client.create(name=server_group_name,
|
||||
policies=['anti-affinity'])
|
||||
return server_group.id
|
||||
|
||||
def _find_aa_server_group(self, cluster):
|
||||
server_group_name = g.generate_aa_group_name(cluster.name)
|
||||
server_groups = nova.client().server_groups.findall(
|
||||
name=server_group_name)
|
||||
|
||||
if len(server_groups) > 1:
|
||||
raise exc.IncorrectStateError(
|
||||
_("Several server groups with name %s found")
|
||||
% server_group_name)
|
||||
|
||||
if len(server_groups) == 1:
|
||||
return server_groups[0].id
|
||||
|
||||
return None
|
||||
|
||||
def _create_auto_security_groups(self, cluster):
|
||||
ctx = context.ctx()
|
||||
for node_group in cluster.node_groups:
|
||||
if node_group.auto_security_group:
|
||||
self._create_auto_security_group(node_group)
|
||||
|
||||
return conductor.cluster_get(ctx, cluster)
|
||||
|
||||
def _count_instances_to_scale(self, node_groups_to_enlarge,
|
||||
node_group_id_map, cluster):
|
||||
|
||||
total_count = 0
|
||||
if node_groups_to_enlarge:
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in node_groups_to_enlarge:
|
||||
count = node_group_id_map[ng.id]
|
||||
total_count += count - ng.count
|
||||
|
||||
return total_count
|
||||
|
||||
def _start_instance(self, cluster, node_group, idx, aa_group,
|
||||
old_aa_groups=None):
|
||||
|
||||
instance_name = g.generate_instance_name(
|
||||
cluster.name, node_group.name, idx)
|
||||
|
||||
current_instance_info = context.InstanceInfo(
|
||||
cluster.id, None, instance_name, node_group.id)
|
||||
|
||||
with context.InstanceInfoManager(current_instance_info):
|
||||
instance_id = self._run_instance(
|
||||
cluster, node_group, idx,
|
||||
aa_group=aa_group, old_aa_groups=old_aa_groups)
|
||||
|
||||
return instance_id
|
||||
|
||||
def _scale_cluster_instances(self, cluster, node_group_id_map):
|
||||
ctx = context.ctx()
|
||||
|
||||
aa_group = None
|
||||
old_aa_groups = None
|
||||
if cluster.anti_affinity:
|
||||
aa_group = self._find_aa_server_group(cluster)
|
||||
if not aa_group:
|
||||
old_aa_groups = self._generate_anti_affinity_groups(cluster)
|
||||
|
||||
instances_to_delete = []
|
||||
node_groups_to_enlarge = set()
|
||||
node_groups_to_delete = set()
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
new_count = node_group_id_map[node_group.id]
|
||||
|
||||
if new_count < node_group.count:
|
||||
instances_to_delete += node_group.instances[new_count:
|
||||
node_group.count]
|
||||
if new_count == 0:
|
||||
node_groups_to_delete.add(node_group.id)
|
||||
elif new_count > node_group.count:
|
||||
node_groups_to_enlarge.add(node_group.id)
|
||||
if node_group.count == 0 and node_group.auto_security_group:
|
||||
self._create_auto_security_group(node_group)
|
||||
|
||||
if instances_to_delete:
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_DELETING_INSTANCES)
|
||||
|
||||
for instance in instances_to_delete:
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
self._shutdown_instance(instance)
|
||||
|
||||
self._await_deleted(cluster, instances_to_delete)
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in node_groups_to_delete:
|
||||
self._delete_auto_security_group(ng)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
instances_to_add = []
|
||||
if node_groups_to_enlarge:
|
||||
|
||||
cpo.add_provisioning_step(
|
||||
cluster.id, _("Add instances"),
|
||||
self._count_instances_to_scale(
|
||||
node_groups_to_enlarge, node_group_id_map, cluster))
|
||||
|
||||
cluster = c_u.change_cluster_status(
|
||||
cluster, c_u.CLUSTER_STATUS_ADDING_INSTANCES)
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in node_groups_to_enlarge:
|
||||
count = node_group_id_map[ng.id]
|
||||
for idx in six.moves.xrange(ng.count + 1, count + 1):
|
||||
instance_id = self._start_instance(
|
||||
cluster, ng, idx, aa_group, old_aa_groups)
|
||||
instances_to_add.append(instance_id)
|
||||
|
||||
return instances_to_add
|
||||
|
||||
def _map_security_groups(self, security_groups):
|
||||
if not security_groups:
|
||||
# Nothing to do here
|
||||
return None
|
||||
|
||||
if CONF.use_neutron:
|
||||
# When using Neutron, ids work fine.
|
||||
return security_groups
|
||||
else:
|
||||
# Nova Network requires that security groups are passed by names.
|
||||
# security_groups.get method accepts both ID and names, so in case
|
||||
# IDs are provided they will be converted, otherwise the names will
|
||||
# just map to themselves.
|
||||
names = []
|
||||
for group_id_or_name in security_groups:
|
||||
group = nova.client().security_groups.get(group_id_or_name)
|
||||
names.append(group.name)
|
||||
return names
|
||||
|
||||
@cpo.event_wrapper(mark_successful_on_exit=True)
|
||||
def _run_instance(self, cluster, node_group, idx, aa_group=None,
|
||||
old_aa_groups=None):
|
||||
"""Create instance using nova client and persist them into DB."""
|
||||
ctx = context.ctx()
|
||||
name = g.generate_instance_name(cluster.name, node_group.name, idx)
|
||||
|
||||
userdata = self._generate_user_data_script(node_group, name)
|
||||
|
||||
if old_aa_groups:
|
||||
# aa_groups: node process -> instance ids
|
||||
aa_ids = []
|
||||
for node_process in node_group.node_processes:
|
||||
aa_ids += old_aa_groups.get(node_process) or []
|
||||
|
||||
# create instances only at hosts w/ no instances
|
||||
# w/ aa-enabled processes
|
||||
hints = {'different_host': sorted(set(aa_ids))} if aa_ids else None
|
||||
else:
|
||||
hints = {'group': aa_group} if (
|
||||
aa_group and self._need_aa_server_group(node_group)) else None
|
||||
|
||||
security_groups = self._map_security_groups(node_group.security_groups)
|
||||
nova_kwargs = {'scheduler_hints': hints, 'userdata': userdata,
|
||||
'key_name': cluster.user_keypair_id,
|
||||
'security_groups': security_groups,
|
||||
'availability_zone': node_group.availability_zone}
|
||||
if CONF.use_neutron:
|
||||
net_id = cluster.neutron_management_network
|
||||
nova_kwargs['nics'] = [{"net-id": net_id, "v4-fixed-ip": ""}]
|
||||
|
||||
nova_instance = nova.client().servers.create(name,
|
||||
node_group.get_image_id(),
|
||||
node_group.flavor_id,
|
||||
**nova_kwargs)
|
||||
instance_id = conductor.instance_add(ctx, node_group,
|
||||
{"instance_id": nova_instance.id,
|
||||
"instance_name": name})
|
||||
|
||||
if old_aa_groups:
|
||||
# save instance id to aa_groups to support aa feature
|
||||
for node_process in node_group.node_processes:
|
||||
if node_process in cluster.anti_affinity:
|
||||
aa_group_ids = old_aa_groups.get(node_process, [])
|
||||
aa_group_ids.append(nova_instance.id)
|
||||
old_aa_groups[node_process] = aa_group_ids
|
||||
|
||||
return instance_id
|
||||
|
||||
def _create_auto_security_group(self, node_group):
|
||||
name = g.generate_auto_security_group_name(node_group)
|
||||
nova_client = nova.client()
|
||||
security_group = nova_client.security_groups.create(
|
||||
name, "Auto security group created by Sahara for Node Group '%s' "
|
||||
"of cluster '%s'." %
|
||||
(node_group.name, node_group.cluster.name))
|
||||
|
||||
# ssh remote needs ssh port, agents are not implemented yet
|
||||
nova_client.security_group_rules.create(
|
||||
security_group.id, 'tcp', SSH_PORT, SSH_PORT, "0.0.0.0/0")
|
||||
|
||||
# open all traffic for private networks
|
||||
if CONF.use_neutron:
|
||||
for cidr in neutron.get_private_network_cidrs(node_group.cluster):
|
||||
for protocol in ['tcp', 'udp']:
|
||||
nova_client.security_group_rules.create(
|
||||
security_group.id, protocol, 1, 65535, cidr)
|
||||
|
||||
nova_client.security_group_rules.create(
|
||||
security_group.id, 'icmp', -1, -1, cidr)
|
||||
|
||||
# enable ports returned by plugin
|
||||
for port in node_group.open_ports:
|
||||
nova_client.security_group_rules.create(
|
||||
security_group.id, 'tcp', port, port, "0.0.0.0/0")
|
||||
|
||||
security_groups = list(node_group.security_groups or [])
|
||||
security_groups.append(security_group.id)
|
||||
conductor.node_group_update(context.ctx(), node_group,
|
||||
{"security_groups": security_groups})
|
||||
return security_groups
|
||||
|
||||
def _need_aa_server_group(self, node_group):
|
||||
for node_process in node_group.node_processes:
|
||||
if node_process in node_group.cluster.anti_affinity:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _assign_floating_ips(self, instances):
|
||||
for instance in instances:
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
node_group = instance.node_group
|
||||
if node_group.floating_ip_pool:
|
||||
networks.assign_floating_ip(instance.instance_id,
|
||||
node_group.floating_ip_pool)
|
||||
|
||||
@poll_utils.poll_status(
|
||||
'await_for_instances_active',
|
||||
_("Wait for instances to become active"), sleep=1)
|
||||
def _check_active(self, active_ids, cluster, instances):
|
||||
if not c_u.check_cluster_exists(cluster):
|
||||
return True
|
||||
for instance in instances:
|
||||
if instance.id not in active_ids:
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
if self._check_if_active(instance):
|
||||
active_ids.add(instance.id)
|
||||
cpo.add_successful_event(instance)
|
||||
return len(instances) == len(active_ids)
|
||||
|
||||
def _await_active(self, cluster, instances):
|
||||
"""Await all instances are in Active status and available."""
|
||||
if not instances:
|
||||
return
|
||||
|
||||
cpo.add_provisioning_step(
|
||||
cluster.id, _("Wait for instances to become active"),
|
||||
len(instances))
|
||||
|
||||
active_ids = set()
|
||||
self._check_active(active_ids, cluster, instances)
|
||||
|
||||
LOG.info(_LI("All instances are active"))
|
||||
|
||||
@cpo.event_wrapper(mark_successful_on_exit=False)
|
||||
def _check_if_active(self, instance):
|
||||
server = nova.get_instance_info(instance)
|
||||
if server.status == 'ERROR':
|
||||
raise exc.SystemError(_("Node %s has error status") % server.name)
|
||||
|
||||
return server.status == 'ACTIVE'
|
||||
|
||||
def _rollback_cluster_creation(self, cluster, ex):
|
||||
"""Shutdown all instances and update cluster status."""
|
||||
|
||||
self.shutdown_cluster(cluster)
|
||||
|
||||
def _rollback_cluster_scaling(self, cluster, instances, ex):
|
||||
"""Attempt to rollback cluster scaling."""
|
||||
|
||||
for i in instances:
|
||||
with context.set_current_instance_id(i.instance_id):
|
||||
self._shutdown_instance(i)
|
||||
|
||||
cluster = conductor.cluster_get(context.ctx(), cluster)
|
||||
c_u.clean_cluster_from_empty_ng(cluster)
|
||||
|
||||
def shutdown_cluster(self, cluster):
|
||||
"""Shutdown specified cluster and all related resources."""
|
||||
self._shutdown_instances(cluster)
|
||||
self._clean_job_executions(cluster)
|
||||
self._delete_aa_server_group(cluster)
|
||||
self._remove_db_objects(cluster)
|
|
@ -1,358 +0,0 @@
|
|||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
from novaclient import exceptions as nova_exceptions
|
||||
|
||||
from sahara import conductor as cond
|
||||
from sahara import context
|
||||
from sahara.service import direct_engine as e
|
||||
from sahara.service import ops
|
||||
from sahara.tests.unit import base
|
||||
from sahara.utils import cluster as cluster_utils
|
||||
from sahara.utils import crypto as c
|
||||
|
||||
|
||||
conductor = cond.API
|
||||
|
||||
|
||||
class AbstractInstanceTest(base.SaharaWithDbTestCase):
|
||||
def setUp(self):
|
||||
super(AbstractInstanceTest, self).setUp()
|
||||
|
||||
self.engine = e.DirectEngine()
|
||||
|
||||
self.is_passthrough_patcher = mock.patch(
|
||||
'sahara.conductor.resource.Resource._is_passthrough_type')
|
||||
self.is_passthrough_patcher.start().return_value = True
|
||||
|
||||
self.novaclient_patcher = mock.patch(
|
||||
'sahara.utils.openstack.nova.client')
|
||||
self.nova = _create_nova_mock(self.novaclient_patcher.start())
|
||||
self.nova.server_groups.findall.return_value = []
|
||||
self.nova.floating_ips.findall.__name__ = 'findall'
|
||||
self.nova.floating_ips.delete.__name__ = 'delete'
|
||||
|
||||
self.get_userdata_patcher = mock.patch(
|
||||
'sahara.utils.remote.get_userdata_template')
|
||||
self.get_userdata_patcher.start().return_value = ''
|
||||
|
||||
def tearDown(self):
|
||||
self.get_userdata_patcher.stop()
|
||||
self.novaclient_patcher.stop()
|
||||
self.is_passthrough_patcher.stop()
|
||||
|
||||
super(AbstractInstanceTest, self).tearDown()
|
||||
|
||||
|
||||
class TestClusterRollBack(AbstractInstanceTest):
|
||||
|
||||
@mock.patch('sahara.service.direct_engine.DirectEngine._check_if_deleted')
|
||||
@mock.patch('sahara.service.ops._prepare_provisioning')
|
||||
@mock.patch('sahara.service.ops.INFRA')
|
||||
def test_cluster_creation_with_errors(self, infra, prepare,
|
||||
deleted_checker):
|
||||
infra.create_cluster.side_effect = self.engine.create_cluster
|
||||
infra.rollback_cluster.side_effect = self.engine.rollback_cluster
|
||||
|
||||
node_groups = [_make_ng_dict('test_group', 'test_flavor',
|
||||
['data node', 'task tracker'], 2)]
|
||||
|
||||
cluster = _create_cluster_mock(node_groups, [])
|
||||
|
||||
prepare.return_value = (context.ctx(), cluster, mock.Mock())
|
||||
|
||||
self.nova.servers.create.side_effect = [_mock_instance(1),
|
||||
MockException("test")]
|
||||
|
||||
self.nova.servers.list.return_value = [_mock_instance(1)]
|
||||
|
||||
deleted_checker.return_value = True
|
||||
|
||||
ops._provision_cluster(cluster.id)
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster_obj = conductor.cluster_get_all(ctx)[0]
|
||||
self.assertEqual(0, len(cluster_obj.node_groups[0].instances))
|
||||
|
||||
|
||||
class NodePlacementTest(AbstractInstanceTest):
|
||||
|
||||
def test_one_node_groups_and_one_affinity_group(self):
|
||||
self.nova.server_groups.create.return_value = mock.Mock(id='123')
|
||||
|
||||
node_groups = [_make_ng_dict('test_group', 'test_flavor',
|
||||
['data node'], 2)]
|
||||
cluster = _create_cluster_mock(node_groups, ["data node"])
|
||||
self.engine._create_instances(cluster)
|
||||
userdata = _generate_user_data_script(cluster)
|
||||
self.nova.servers.create.assert_has_calls(
|
||||
[mock.call("test_cluster-test_group-001",
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints={'group': "123"},
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None),
|
||||
mock.call("test_cluster-test_group-002",
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints={'group': "123"},
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None)],
|
||||
any_order=False)
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster_obj = conductor.cluster_get_all(ctx)[0]
|
||||
self.assertEqual(2, len(cluster_obj.node_groups[0].instances))
|
||||
|
||||
def test_one_node_groups_and_no_affinity_group(self):
|
||||
self.nova.server_groups.create.return_value = mock.Mock(id='123')
|
||||
|
||||
node_groups = [_make_ng_dict('test_group', 'test_flavor',
|
||||
['data node', 'task tracker'], 2)]
|
||||
|
||||
cluster = _create_cluster_mock(node_groups, [])
|
||||
|
||||
self.engine._create_instances(cluster)
|
||||
userdata = _generate_user_data_script(cluster)
|
||||
|
||||
self.nova.servers.create.assert_has_calls(
|
||||
[mock.call("test_cluster-test_group-001",
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints=None,
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None),
|
||||
mock.call("test_cluster-test_group-002",
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints=None,
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None)],
|
||||
any_order=False)
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster_obj = conductor.cluster_get_all(ctx)[0]
|
||||
self.assertEqual(2, len(cluster_obj.node_groups[0].instances))
|
||||
|
||||
def test_two_node_groups_and_one_affinity_group(self):
|
||||
self.nova.server_groups.create.return_value = mock.Mock(id='123')
|
||||
|
||||
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
|
||||
["data node", "test tracker"], 2),
|
||||
_make_ng_dict("test_group_2", "test_flavor",
|
||||
["data node", "test tracker"], 1)]
|
||||
|
||||
cluster = _create_cluster_mock(node_groups, ["data node"])
|
||||
self.engine._create_instances(cluster)
|
||||
userdata = _generate_user_data_script(cluster)
|
||||
|
||||
self.nova.servers.create.assert_has_calls(
|
||||
[mock.call('test_cluster-test_group_1-001',
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints={'group': "123"},
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None),
|
||||
mock.call('test_cluster-test_group_1-002',
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints={'group': "123"},
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None),
|
||||
mock.call('test_cluster-test_group_2-001',
|
||||
"initial",
|
||||
"test_flavor",
|
||||
scheduler_hints={'group': "123"},
|
||||
userdata=userdata,
|
||||
key_name='user_keypair',
|
||||
security_groups=None,
|
||||
availability_zone=None)],
|
||||
any_order=False)
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster_obj = conductor.cluster_get_all(ctx)[0]
|
||||
inst_number = len(cluster_obj.node_groups[0].instances)
|
||||
inst_number += len(cluster_obj.node_groups[1].instances)
|
||||
self.assertEqual(3, inst_number)
|
||||
|
||||
|
||||
class IpManagementTest(AbstractInstanceTest):
|
||||
def setUp(self):
|
||||
super(IpManagementTest, self).setUp()
|
||||
self.engine = e.DirectEngine()
|
||||
|
||||
def test_ip_assignment_use_no_floating(self):
|
||||
self.override_config("use_floating_ips", False)
|
||||
|
||||
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
|
||||
["data node", "test tracker"], 2,
|
||||
'pool'),
|
||||
_make_ng_dict("test_group_2", "test_flavor",
|
||||
["name node", "test tracker"], 1)]
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster = _create_cluster_mock(node_groups, ["data node"])
|
||||
self.engine._create_instances(cluster)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
instances_list = cluster_utils.get_instances(cluster)
|
||||
|
||||
self.engine._assign_floating_ips(instances_list)
|
||||
|
||||
self.nova.floating_ips.create.assert_has_calls(
|
||||
[mock.call("pool"), mock.call("pool")])
|
||||
|
||||
self.assertEqual(2, self.nova.floating_ips.create.call_count,
|
||||
"Not expected floating IPs number found.")
|
||||
|
||||
|
||||
class ShutdownClusterTest(AbstractInstanceTest):
|
||||
|
||||
@mock.patch('sahara.service.direct_engine.DirectEngine._check_if_deleted')
|
||||
@mock.patch('sahara.service.direct_engine.DirectEngine.'
|
||||
'_map_security_groups')
|
||||
def test_delete_floating_ips(self, map_mock, deleted_checker):
|
||||
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
|
||||
["data node", "test tracker"], 2, 'pool')]
|
||||
map_mock.return_value = []
|
||||
ctx = context.ctx()
|
||||
cluster = _create_cluster_mock(node_groups, ["datanode"])
|
||||
self.engine._create_instances(cluster)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
instances_list = cluster_utils.get_instances(cluster)
|
||||
|
||||
self.engine._assign_floating_ips(instances_list)
|
||||
|
||||
deleted_checker.return_value = True
|
||||
|
||||
self.engine._shutdown_instances(cluster)
|
||||
self.assertEqual(2, self.nova.floating_ips.delete.call_count,
|
||||
"Not expected floating IPs number found in delete")
|
||||
self.assertEqual(2, self.nova.servers.delete.call_count,
|
||||
"Not expected")
|
||||
|
||||
|
||||
def _make_ng_dict(name, flavor, processes, count, floating_ip_pool=None):
|
||||
ng_dict = {'name': name, 'flavor_id': flavor, 'node_processes': processes,
|
||||
'count': count, 'image_username': 'root'}
|
||||
if floating_ip_pool:
|
||||
ng_dict.update({"floating_ip_pool": floating_ip_pool})
|
||||
return ng_dict
|
||||
|
||||
|
||||
def _create_cluster_mock(node_groups, aa):
|
||||
|
||||
user_kp = mock.Mock()
|
||||
user_kp.public_key = "123"
|
||||
private_key = c.generate_key_pair()[0]
|
||||
|
||||
dct = {'name': 'test_cluster',
|
||||
'plugin_name': 'mock_plugin',
|
||||
'hadoop_version': 'mock_version',
|
||||
'default_image_id': 'initial',
|
||||
'user_keypair_id': 'user_keypair',
|
||||
'anti_affinity': aa,
|
||||
'_user_kp': user_kp,
|
||||
'private_key': private_key,
|
||||
'node_groups': node_groups}
|
||||
|
||||
cluster = conductor.cluster_create(context.ctx(), dct)
|
||||
|
||||
return cluster
|
||||
|
||||
|
||||
def _mock_instance(id):
|
||||
server = mock.Mock()
|
||||
server.id = id
|
||||
server.instance_id = id
|
||||
server.status = 'ACTIVE'
|
||||
server.networks = ["n1", "n2"]
|
||||
server.addresses = {'n1': [{'OS-EXT-IPS:type': 'fixed',
|
||||
'addr': "{0}.{0}.{0}.{0}" .format(id)}],
|
||||
'n2': [{'OS-EXT-IPS:type': 'floating',
|
||||
'addr': "{0}.{0}.{0}.{0}" .format(id)}]}
|
||||
|
||||
server.add_floating_ip.side_effect = [True, True, True]
|
||||
return server
|
||||
|
||||
|
||||
def _mock_ip(id):
|
||||
ip = mock.Mock()
|
||||
ip.id = id
|
||||
ip.ip = "{0}.{0}.{0}.{0}" .format(id)
|
||||
|
||||
return ip
|
||||
|
||||
|
||||
def _mock_instances(count):
|
||||
return [_mock_instance(str(i)) for i in range(1, count + 1)]
|
||||
|
||||
|
||||
def _mock_ips(count):
|
||||
return [_mock_ip(str(i)) for i in range(1, count + 1)]
|
||||
|
||||
|
||||
def _generate_user_data_script(cluster):
|
||||
script_template = """#!/bin/bash
|
||||
echo "%(public_key)s" >> %(user_home)s/.ssh/authorized_keys\n
|
||||
# ====== COMMENT OUT Defaults requiretty in /etc/sudoers ========
|
||||
sed '/^Defaults requiretty*/ s/^/#/' -i /etc/sudoers\n
|
||||
"""
|
||||
return script_template % {
|
||||
"public_key": cluster.management_public_key,
|
||||
"user_home": "/root/"
|
||||
}
|
||||
|
||||
|
||||
def _create_nova_mock(novaclient):
|
||||
nova = mock.Mock()
|
||||
novaclient.return_value = nova
|
||||
nova.servers.create.side_effect = _mock_instances(4)
|
||||
nova.servers.get.return_value = _mock_instance(1)
|
||||
nova.floating_ips.create.side_effect = _mock_ips(4)
|
||||
nova.floating_ips.findall.return_value = _mock_ips(1)
|
||||
nova.floating_ips.delete.side_effect = _mock_deletes(2)
|
||||
images = mock.Mock()
|
||||
images.username = "root"
|
||||
nova.images.get = lambda x: images
|
||||
return nova
|
||||
|
||||
|
||||
def _mock_deletes(count):
|
||||
return [_mock_delete(i) for i in range(1, count + 1)]
|
||||
|
||||
|
||||
def _mock_delete(id):
|
||||
if id == 1:
|
||||
return None
|
||||
return nova_exceptions.NotFound(code=404)
|
||||
|
||||
|
||||
class MockException(Exception):
|
||||
pass
|
Loading…
Reference in New Issue