sahara/sahara/service/heat/heat_engine.py

247 lines
8.9 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from heatclient import exc as heat_exc
from oslo_config import cfg
from oslo_log import log as logging
from sahara import conductor as c
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LW
from sahara.service import engine as e
from sahara.service.heat import templates as ht
from sahara.service import volumes
from sahara.utils import cluster as c_u
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import base as b
from sahara.utils.openstack import heat
conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CREATE_STAGES = [c_u.CLUSTER_STATUS_SPAWNING, c_u.CLUSTER_STATUS_WAITING,
c_u.CLUSTER_STATUS_PREPARING]
SCALE_STAGES = [c_u.CLUSTER_STATUS_SCALING_SPAWNING,
c_u.CLUSTER_STATUS_SCALING_WAITING,
c_u.CLUSTER_STATUS_SCALING_PREPARING]
ROLLBACK_STAGES = [c_u.CLUSTER_STATUS_ROLLBACK_SPAWNING,
c_u.CLUSTER_STATUS_ROLLBACK_WAITING,
c_u.CLUSTER_STATUS_ROLLBACK__PREPARING]
heat_engine_opts = [
cfg.ListOpt('heat_stack_tags', default=['data-processing-cluster'],
help="List of tags to be used during operating with stack.")
]
CONF.register_opts(heat_engine_opts)
class HeatEngine(e.Engine):
def get_type_and_version(self):
return "heat.3.0"
def create_cluster(self, cluster):
self._update_rollback_strategy(cluster, shutdown=True)
target_count = self._get_ng_counts(cluster)
self._nullify_ng_counts(cluster)
self._launch_instances(cluster, target_count, CREATE_STAGES)
self._update_rollback_strategy(cluster)
def _get_ng_counts(self, cluster):
count = {}
for node_group in cluster.node_groups:
count[node_group.id] = node_group.count
return count
def _nullify_ng_counts(self, cluster):
ctx = context.ctx()
for node_group in cluster.node_groups:
conductor.node_group_update(ctx, node_group, {"count": 0})
def scale_cluster(self, cluster, target_count):
ctx = context.ctx()
rollback_count = self._get_ng_counts(cluster)
self._update_rollback_strategy(cluster, rollback_count=rollback_count,
target_count=target_count)
inst_ids = self._launch_instances(
cluster, target_count, SCALE_STAGES,
update_stack=True, disable_rollback=False)
cluster = conductor.cluster_get(ctx, cluster)
c_u.clean_cluster_from_empty_ng(cluster)
self._update_rollback_strategy(cluster)
return inst_ids
def rollback_cluster(self, cluster, reason):
rollback_info = cluster.rollback_info or {}
self._update_rollback_strategy(cluster)
if rollback_info.get('shutdown', False):
self._rollback_cluster_creation(cluster, reason)
LOG.warning(_LW("Cluster creation rollback "
"(reason: {reason})").format(reason=reason))
return False
rollback_count = rollback_info.get('rollback_count', {}).copy()
target_count = rollback_info.get('target_count', {}).copy()
if rollback_count or target_count:
self._rollback_cluster_scaling(
cluster, rollback_count, target_count, reason)
LOG.warning(_LW("Cluster scaling rollback "
"(reason: {reason})").format(reason=reason))
return True
return False
def _update_rollback_strategy(self, cluster, shutdown=False,
rollback_count=None, target_count=None):
rollback_info = {}
if shutdown:
rollback_info['shutdown'] = shutdown
if rollback_count:
rollback_info['rollback_count'] = rollback_count
if target_count:
rollback_info['target_count'] = target_count
cluster = conductor.cluster_update(
context.ctx(), cluster, {'rollback_info': rollback_info})
return cluster
def _populate_cluster(self, cluster, stack):
ctx = context.ctx()
old_ids = [i.instance_id for i in c_u.get_instances(cluster)]
new_ids = []
for node_group in cluster.node_groups:
instances = stack.get_node_group_instances(node_group)
for instance in instances:
nova_id = instance['physical_id']
name = instance['name']
if nova_id not in old_ids:
instance_id = conductor.instance_add(
ctx, node_group, {"instance_id": nova_id,
"instance_name": name})
new_ids.append(instance_id)
return new_ids
def _rollback_cluster_creation(self, cluster, ex):
"""Shutdown all instances and update cluster status."""
self.shutdown_cluster(cluster)
def _rollback_cluster_scaling(self, cluster, rollback_count,
target_count, ex):
"""Attempt to rollback cluster scaling.
Our rollback policy for scaling is as follows:
We shut down nodes created during scaling, but we don't try to
to get back decommissioned nodes. I.e. during the rollback
we only shut down nodes and not launch them. That approach should
maximize the chance of rollback success.
"""
for ng in rollback_count:
if rollback_count[ng] > target_count[ng]:
rollback_count[ng] = target_count[ng]
self._launch_instances(cluster, rollback_count, ROLLBACK_STAGES,
update_stack=True)
def shutdown_cluster(self, cluster):
"""Shutdown specified cluster and all related resources."""
try:
b.execute_with_retries(heat.client().stacks.delete, cluster.name)
stack = heat.get_stack(cluster.name)
heat.wait_stack_completion(stack)
except heat_exc.HTTPNotFound:
LOG.warning(_LW('Did not find stack for cluster. Trying to delete '
'cluster manually.'))
# Stack not found. Trying to delete cluster like direct engine
# do it
self._shutdown_instances(cluster)
self._delete_aa_server_group(cluster)
self._clean_job_executions(cluster)
self._remove_db_objects(cluster)
@cpo.event_wrapper(
True, step=_('Create Heat stack'), param=('cluster', 1))
def _create_instances(self, cluster, target_count, update_stack=False,
disable_rollback=True):
stack = ht.ClusterStack(cluster)
self._update_instance_count(stack, cluster, target_count)
stack.instantiate(update_existing=update_stack,
disable_rollback=disable_rollback)
heat.wait_stack_completion(
stack.heat_stack,
is_update=update_stack, last_updated_time=stack.last_updated_time)
return self._populate_cluster(cluster, stack)
def _launch_instances(self, cluster, target_count, stages,
update_stack=False, disable_rollback=True):
# create all instances
cluster = c_u.change_cluster_status(cluster, stages[0])
inst_ids = self._create_instances(
cluster, target_count, update_stack, disable_rollback)
# wait for all instances are up and networks ready
cluster = c_u.change_cluster_status(cluster, stages[1])
instances = c_u.get_instances(cluster, inst_ids)
self._await_networks(cluster, instances)
# prepare all instances
cluster = c_u.change_cluster_status(cluster, stages[2])
instances = c_u.get_instances(cluster, inst_ids)
volumes.mount_to_instances(instances)
self._configure_instances(cluster)
return inst_ids
def _update_instance_count(self, stack, cluster, target_count):
ctx = context.ctx()
for node_group in cluster.node_groups:
count = target_count[node_group.id]
stack.add_node_group_extra(node_group.id, count,
self._generate_user_data_script)
# if number of instances decreases, we need to drop
# the excessive ones
for i in range(count, node_group.count):
conductor.instance_remove(ctx, node_group.instances[i])