Decommission of a specific node

Adding the option to decommission specific node from cluster.

Partially implements bp: decommission-specific-instance

Change-Id: I1a858fecc1b32f91c76aea6db14e0d5a419211d1
This commit is contained in:
Telles Nobrega 2017-11-22 13:34:46 -03:00
parent 32d4c744f7
commit b001ef2a55
11 changed files with 358 additions and 64 deletions

View File

@ -60,7 +60,7 @@ def clusters_create_multiple(data):
@rest.put('/clusters/<cluster_id>')
@acl.enforce("data-processing:clusters:scale")
@v.check_exists(api.get_cluster, 'cluster_id')
@v.validate(v_c_schema.CLUSTER_SCALING_SCHEMA, v_c_s.check_cluster_scaling)
@v.validate(v_c_schema.CLUSTER_SCALING_SCHEMA_V2, v_c_s.check_cluster_scaling)
def clusters_scale(cluster_id, data):
return u.to_wrapped_dict(api.scale_cluster, cluster_id, data)

View File

@ -54,9 +54,12 @@ def scale_cluster(id, data):
# the next map is the main object we will work with
# to_be_enlarged : {node_group_id: desired_amount_of_instances}
to_be_enlarged = {}
node_group_instance_map = {}
for ng in existing_node_groups:
ng_id = g.find(cluster.node_groups, name=ng['name'])['id']
to_be_enlarged.update({ng_id: ng['count']})
if 'instances' in ng:
node_group_instance_map.update({ng_id: ng['instances']})
additional = construct_ngs_for_scaling(cluster, additional_node_groups)
cluster = conductor.cluster_get(ctx, cluster)
@ -82,7 +85,8 @@ def scale_cluster(id, data):
if node_group.id not in to_be_enlarged:
to_be_enlarged[node_group.id] = node_group.count
api.OPS.provision_scaled_cluster(id, to_be_enlarged)
api.OPS.provision_scaled_cluster(id, to_be_enlarged,
node_group_instance_map)
return cluster

View File

@ -84,7 +84,7 @@ class HeatEngine(e.Engine):
for node_group in cluster.node_groups:
conductor.node_group_update(ctx, node_group, {"count": 0})
def scale_cluster(self, cluster, target_count):
def scale_cluster(self, cluster, target_count, instances_to_delete=None):
ctx = context.ctx()
rollback_count = self._get_ng_counts(cluster)
@ -94,7 +94,8 @@ class HeatEngine(e.Engine):
inst_ids = self._launch_instances(
cluster, target_count, SCALE_STAGES,
update_stack=True, disable_rollback=False)
update_stack=True, disable_rollback=False,
instances_to_delete=instances_to_delete)
cluster = conductor.cluster_get(ctx, cluster)
c_u.clean_cluster_from_empty_ng(cluster)
@ -209,10 +210,12 @@ class HeatEngine(e.Engine):
@cpo.event_wrapper(
True, step=_('Create Heat stack'), param=('cluster', 1))
def _create_instances(self, cluster, target_count, update_stack=False,
disable_rollback=True):
disable_rollback=True, instances_to_delete=None):
stack = ht.ClusterStack(cluster)
self._update_instance_count(stack, cluster, target_count)
self._update_instance_count(stack, cluster, target_count,
instances_to_delete)
stack.instantiate(update_existing=update_stack,
disable_rollback=disable_rollback)
heat.wait_stack_completion(
@ -221,12 +224,14 @@ class HeatEngine(e.Engine):
return self._populate_cluster(cluster, stack)
def _launch_instances(self, cluster, target_count, stages,
update_stack=False, disable_rollback=True):
update_stack=False, disable_rollback=True,
instances_to_delete=None):
# create all instances
cluster = c_u.change_cluster_status(cluster, stages[0])
inst_ids = self._create_instances(
cluster, target_count, update_stack, disable_rollback)
cluster, target_count, update_stack, disable_rollback,
instances_to_delete)
# wait for all instances are up and networks ready
cluster = c_u.change_cluster_status(cluster, stages[1])
@ -246,19 +251,27 @@ class HeatEngine(e.Engine):
return inst_ids
def _update_instance_count(self, stack, cluster, target_count):
def _update_instance_count(self, stack, cluster, target_count,
instances_to_delete=None):
ctx = context.ctx()
instances_name_to_delete = {}
if instances_to_delete:
for instance in instances_to_delete:
node_group_id = instance['node_group']['id']
if node_group_id not in instances_name_to_delete:
instances_name_to_delete[node_group_id] = []
instances_name_to_delete[node_group_id].append(
instance['instance_name'])
for node_group in cluster.node_groups:
count = target_count[node_group.id]
stack.add_node_group_extra(node_group.id, count,
self._generate_user_data_script)
stack.add_node_group_extra(
node_group.id, count, self._generate_user_data_script,
instances_name_to_delete.get(node_group.id, None))
# if number of instances decreases, we need to drop
# the excessive ones
# instances list doesn't order by creating date, so we should
# sort it to make sure deleted instances same as heat deleted.
insts = sorted(
node_group.instances,
key=lambda x: int(x['instance_name'].split('-')[-1]))
for i in range(count, node_group.count):
conductor.instance_remove(ctx, insts[i])
for inst in node_group.instances:
if (instances_to_delete and
node_group.id in instances_name_to_delete):
if (inst.instance_name in
instances_name_to_delete[node_group.id]):
conductor.instance_remove(ctx, inst)

View File

@ -136,6 +136,10 @@ def _get_wc_waiter_name(inst_name):
return '%s-wc-waiter' % inst_name
def _get_index_from_inst_name(inst_name):
return inst_name.split('-')[-1]
class ClusterStack(object):
def __init__(self, cluster):
self.cluster = cluster
@ -165,15 +169,16 @@ class ClusterStack(object):
node_group=ng.name, info=self.base_info))
def add_node_group_extra(self, node_group_id, node_count,
gen_userdata_func):
gen_userdata_func, instances_to_delete=None):
self.node_groups_extra[node_group_id] = {
'node_count': node_count,
'gen_userdata_func': gen_userdata_func
'gen_userdata_func': gen_userdata_func,
'instances_to_delete': instances_to_delete
}
def _get_main_template(self):
def _get_main_template(self, instances_to_delete=None):
outputs = {}
resources = self._serialize_resources(outputs)
resources = self._serialize_resources(outputs, instances_to_delete)
return yaml.safe_dump({
"heat_template_version": heat_common.HEAT_TEMPLATE_VERSION,
"description": self.base_info,
@ -181,8 +186,9 @@ class ClusterStack(object):
"outputs": outputs
})
def instantiate(self, update_existing, disable_rollback=True):
main_tmpl = self._get_main_template()
def instantiate(self, update_existing, disable_rollback=True,
instances_to_delete=None):
main_tmpl = self._get_main_template(instances_to_delete)
kwargs = {
'stack_name': self.cluster.stack_name,
'timeout_mins': 180,
@ -241,7 +247,7 @@ class ClusterStack(object):
}
}
def _serialize_resources(self, outputs):
def _serialize_resources(self, outputs, instances_to_delete=None):
resources = {}
if self.cluster.anti_affinity:
@ -250,14 +256,15 @@ class ClusterStack(object):
resources.update(self._serialize_aa_server_group(i))
for ng in self.cluster.node_groups:
resources.update(self._serialize_ng_group(ng, outputs))
resources.update(self._serialize_ng_group(ng, outputs,
instances_to_delete))
for ng in self.cluster.node_groups:
resources.update(self._serialize_auto_security_group(ng))
return resources
def _serialize_ng_group(self, ng, outputs):
def _serialize_ng_group(self, ng, outputs, instances_to_delete=None):
ng_file_name = "file://" + ng.name + ".yaml"
self.files[ng_file_name] = self._serialize_ng_file(ng)
@ -279,11 +286,19 @@ class ClusterStack(object):
properties[AUTO_SECURITY_GROUP_PARAM_NAME] = {
'get_resource': g.generate_auto_security_group_name(ng)}
removal_policies = []
if self.node_groups_extra[ng.id]['instances_to_delete']:
resource_list = []
for name in self.node_groups_extra[ng.id]['instances_to_delete']:
resource_list.append(_get_index_from_inst_name(name))
removal_policies.append({'resource_list': resource_list})
return {
ng.name: {
"type": "OS::Heat::ResourceGroup",
"properties": {
"count": self.node_groups_extra[ng.id]['node_count'],
"removal_policies": removal_policies,
"resource_def": {
"type": ng_file_name,
"properties": properties

View File

@ -54,9 +54,11 @@ class LocalOps(object):
context.spawn("cluster-creating-%s" % cluster_id,
_provision_cluster, cluster_id)
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
context.spawn("cluster-scaling-%s" % cluster_id,
_provision_scaled_cluster, cluster_id, node_group_id_map)
_provision_scaled_cluster, cluster_id, node_group_id_map,
node_group_instance_map)
def terminate_cluster(self, cluster_id):
context.spawn("cluster-terminating-%s" % cluster_id,
@ -94,9 +96,11 @@ class RemoteOps(rpc_utils.RPCClient):
def provision_cluster(self, cluster_id):
self.cast('provision_cluster', cluster_id=cluster_id)
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
self.cast('provision_scaled_cluster', cluster_id=cluster_id,
node_group_id_map=node_group_id_map)
node_group_id_map=node_group_id_map,
node_group_instance_map=node_group_instance_map)
def terminate_cluster(self, cluster_id):
self.cast('terminate_cluster', cluster_id=cluster_id)
@ -143,8 +147,10 @@ class OpsServer(rpc_utils.RPCServer):
_provision_cluster(cluster_id)
@request_context
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
_provision_scaled_cluster(cluster_id, node_group_id_map)
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
_provision_scaled_cluster(cluster_id, node_group_id_map,
node_group_instance_map)
@request_context
def terminate_cluster(self, cluster_id):
@ -314,42 +320,76 @@ def _provision_cluster(cluster_id):
@ops_error_handler(
_("Scaling cluster failed for the following reason(s): {reason}"))
def _provision_scaled_cluster(cluster_id, node_group_id_map):
def _provision_scaled_cluster(cluster_id, node_group_id_map,
node_group_instance_map=None):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
# Decommissioning surplus nodes with the plugin
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
instances_to_delete = []
try:
instances_to_delete = []
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
if new_count < node_group.count:
if (node_group_instance_map and
node_group.id in node_group_instance_map):
for instance_ref in node_group_instance_map[
node_group.id]:
instance = _get_instance_obj(node_group.instances,
instance_ref)
instances_to_delete.append(instance)
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
if new_count < node_group.count:
instances_to_delete += node_group.instances[new_count:
node_group.count]
while node_group.count - new_count > len(instances_to_delete):
instances_to_delete.append(_get_random_instance_from_ng(
node_group.instances, instances_to_delete))
if instances_to_delete:
context.set_step_type(_("Plugin: decommission cluster"))
plugin.decommission_nodes(cluster, instances_to_delete)
if instances_to_delete:
context.set_step_type(_("Plugin: decommission cluster"))
plugin.decommission_nodes(cluster, instances_to_delete)
# Scaling infrastructure
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id, instance_ids)
# Scaling infrastructure
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map,
instances_to_delete)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id, instance_ids)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
_refresh_health_for_cluster(cluster_id)
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
_refresh_health_for_cluster(cluster_id)
except Exception as e:
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE,
six.text_type(e))
def _get_instance_obj(instances, instance_ref):
for instance in instances:
if (instance.instance_id == instance_ref or
instance.instance_name == instance_ref):
return instance
raise exceptions.NotFoundException(str(instance_ref),
_("Instance %s not found"))
def _get_random_instance_from_ng(instances, instances_to_delete):
# instances list doesn't order by creating date, so we should
# sort it to make sure deleted instances same as heat deleted.
insts = sorted(instances,
key=lambda x: int(x['instance_name'].split('-')[-1]))
for instance in reversed(insts):
if instance not in instances_to_delete:
return instance
@ops_error_handler(

View File

@ -17,6 +17,7 @@ import collections
import novaclient.exceptions as nova_ex
from oslo_config import cfg
from oslo_utils import uuidutils
import six
from sahara import conductor as cond
@ -373,6 +374,40 @@ def check_resize(cluster, r_node_groups):
cluster.hadoop_version,
ng_tmp['node_group_template'])
for scaling_ng in r_node_groups:
current_count = ng_map[scaling_ng['name']].count
new_count = scaling_ng['count']
count_diff = current_count - new_count
if 'instances' in scaling_ng:
if len(scaling_ng['instances']) > count_diff:
raise ex.InvalidDataException(
_("Number of specific instances (%(instance)s) to"
" delete can not be greater than the count difference"
" (%(count)s during scaling")
% {'instance': str(len(scaling_ng['instances'])),
'count': str(count_diff)})
else:
if len(scaling_ng['instances']) > 0:
is_uuid = uuidutils.is_uuid_like(
scaling_ng['instances'][0])
if is_uuid:
for instance in scaling_ng['instances']:
if not uuidutils.is_uuid_like(instance):
raise ex.InvalidReferenceException(
_("You can only reference instances by"
" Name or UUID, not both on the same"
" request"))
else:
for instance in scaling_ng['instances']:
if uuidutils.is_uuid_like(instance):
raise ex.InvalidReferenceException(
_("You can only reference instances by"
" Name or UUID, not both on the same"
" request"))
_check_duplicates(scaling_ng['instances'],
_("Duplicate entry for instances to"
" delete"))
def check_add_node_groups(cluster, add_node_groups):
cluster_ng_names = [ng.name for ng in cluster.node_groups]

View File

@ -134,3 +134,15 @@ CLUSTER_SCALING_SCHEMA = {
}
]
}
CLUSTER_SCALING_SCHEMA_V2 = copy.deepcopy(CLUSTER_SCALING_SCHEMA)
CLUSTER_SCALING_SCHEMA_V2['properties']['resize_node_groups'][
'items']['properties'].update(
{
"instances": {
"type": "array",
"items": {
"type": "string",
},
}
})

View File

@ -77,6 +77,36 @@ SCALE_DATA = {
]
}
SCALE_DATA_SPECIFIC_INSTANCE = {
'resize_node_groups': [
{
'name': 'ng_1',
'count': 3,
},
{
'name': 'ng_2',
'count': 2,
'instances': ['ng_2_0']
}
],
'add_node_groups': []
}
SCALE_DATA_N_SPECIFIC_INSTANCE = {
'resize_node_groups': [
{
'name': 'ng_1',
'count': 3,
},
{
'name': 'ng_2',
'count': 1,
'instances': ['ng_2_0', 'ng_2_2']
}
],
'add_node_groups': []
}
class FakePlugin(pr_base.ProvisioningPluginBase):
_info = {}

View File

@ -37,19 +37,54 @@ class FakeOps(object):
def provision_cluster(self, id):
self.calls_order.append('ops.provision_cluster')
cluster = conductor.cluster_get(context.ctx(), id)
target_count = {}
for node_group in cluster.node_groups:
target_count[node_group.id] = node_group.count
for node_group in cluster.node_groups:
conductor.node_group_update(context.ctx(),
node_group, {"count": 0})
for node_group in cluster.node_groups:
for i in range(target_count[node_group.id]):
inst = {
"instance_id": node_group.name + '_' + str(i),
"instance_name": node_group.name + '_' + str(i)
}
conductor.instance_add(context.ctx(), node_group, inst)
conductor.cluster_update(
context.ctx(), id, {'status': c_u.CLUSTER_STATUS_ACTIVE})
def provision_scaled_cluster(self, id, to_be_enlarged):
def provision_scaled_cluster(self, id, to_be_enlarged,
node_group_instance_map=None):
self.calls_order.append('ops.provision_scaled_cluster')
cluster = conductor.cluster_get(context.ctx(), id)
# Set scaled to see difference between active and scaled
for (ng, count) in six.iteritems(to_be_enlarged):
instances_to_delete = []
if node_group_instance_map:
if ng in node_group_instance_map:
instances_to_delete = self._get_instance(
cluster, node_group_instance_map[ng])
for instance in instances_to_delete:
conductor.instance_remove(context.ctx(), instance)
conductor.node_group_update(context.ctx(), ng, {'count': count})
conductor.cluster_update(context.ctx(), id, {'status': 'Scaled'})
def terminate_cluster(self, id):
self.calls_order.append('ops.terminate_cluster')
def _get_instance(self, cluster, instances_to_delete):
instances = []
for node_group in cluster.node_groups:
for instance in node_group.instances:
if instance.instance_id in instances_to_delete:
instances.append(instance)
return instances
class TestClusterApi(base.SaharaWithDbTestCase):
def setUp(self):
@ -134,6 +169,7 @@ class TestClusterApi(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
def test_scale_cluster_success(self, check_scaling, check_cluster):
cluster = api.create_cluster(api_base.SAMPLE_CLUSTER)
cluster = api.get_cluster(cluster.id)
api.scale_cluster(cluster.id, api_base.SCALE_DATA)
result_cluster = api.get_cluster(cluster.id)
self.assertEqual('Scaled', result_cluster.status)
@ -156,6 +192,46 @@ class TestClusterApi(base.SaharaWithDbTestCase):
'ops.provision_scaled_cluster',
'ops.terminate_cluster'], self.calls_order)
@mock.patch('sahara.service.quotas.check_cluster', return_value=None)
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
def test_scale_cluster_n_specific_instances_success(self, check_scaling,
check_cluster):
cluster = api.create_cluster(api_base.SAMPLE_CLUSTER)
cluster = api.get_cluster(cluster.id)
api.scale_cluster(cluster.id, api_base.SCALE_DATA_N_SPECIFIC_INSTANCE)
result_cluster = api.get_cluster(cluster.id)
self.assertEqual('Scaled', result_cluster.status)
expected_count = {
'ng_1': 3,
'ng_2': 1,
'ng_3': 1,
}
ng_count = 0
for ng in result_cluster.node_groups:
self.assertEqual(expected_count[ng.name], ng.count)
ng_count += 1
self.assertEqual(1, result_cluster.node_groups[1].count)
self.assertNotIn('ng_2_0',
self._get_instances_ids(
result_cluster.node_groups[1]))
self.assertNotIn('ng_2_2',
self._get_instances_ids(
result_cluster.node_groups[1]))
self.assertEqual(3, ng_count)
api.terminate_cluster(result_cluster.id)
self.assertEqual(
['get_open_ports', 'recommend_configs', 'validate',
'ops.provision_cluster', 'get_open_ports',
'recommend_configs', 'validate_scaling',
'ops.provision_scaled_cluster',
'ops.terminate_cluster'], self.calls_order)
def _get_instances_ids(self, node_group):
instance_ids = []
for instance in node_group.instances:
instance_ids.append(instance.instance_id)
return instance_ids
@mock.patch('sahara.service.quotas.check_cluster', return_value=None)
@mock.patch('sahara.service.quotas.check_scaling', return_value=None)
def test_scale_cluster_failed(self, check_scaling, check_cluster):

View File

@ -30,7 +30,8 @@ class FakeCluster(object):
class FakeNodeGroup(object):
id = 'id'
count = 2
instances = [1, 2]
instances = [{'instance_name': 'id-10', 'id': 2},
{'instance_name': 'id-2', 'id': 1}]
class FakePlugin(mock.Mock):
@ -52,7 +53,8 @@ class FakePlugin(mock.Mock):
def decommission_nodes(self, cluster, instances_to_delete):
TestOPS.SEQUENCE.append('decommission_nodes')
def scale_cluster(self, cluster, node_group_id_map):
def scale_cluster(self, cluster, node_group_id_map,
node_group_instance_map=None):
TestOPS.SEQUENCE.append('plugin.scale_cluster')
def cluster_destroy(self, ctx, cluster):
@ -63,7 +65,8 @@ class FakeINFRA(object):
def create_cluster(self, cluster):
TestOPS.SEQUENCE.append('create_cluster')
def scale_cluster(self, cluster, node_group_id_map):
def scale_cluster(self, cluster, node_group_id_map,
node_group_instance_map=None):
TestOPS.SEQUENCE.append('INFRA.scale_cluster')
return True

View File

@ -211,6 +211,23 @@ class TestScalingValidation(u.ValidationTestCase):
self.assertEqual(1, req_data.call_count)
self._assert_calls(bad_req, bad_req_i)
@mock.patch("sahara.utils.api.request_data")
@mock.patch("sahara.utils.api.bad_request")
def _assert_cluster_scaling_validation_v2(self,
bad_req=None,
req_data=None,
data=None,
bad_req_i=None):
m_func = mock.Mock()
m_func.__name__ = "m_func"
req_data.return_value = data
v.validate(c_schema.CLUSTER_SCALING_SCHEMA_V2,
self._create_object_fun)(m_func)(data=data,
cluster_id='42')
self.assertEqual(1, req_data.call_count)
self._assert_calls(bad_req, bad_req_i)
@mock.patch("sahara.service.api.OPS")
def test_cluster_scaling_scheme_v_resize_ng(self, ops):
ops.get_engine_type_and_version.return_value = "direct.1.1"
@ -259,6 +276,55 @@ class TestScalingValidation(u.ValidationTestCase):
u"allowed ('flavor_id' was unexpected)")
)
@mock.patch("sahara.service.api.OPS")
def test_cluster_scaling_scheme_v_resize_ng_v2(self, ops):
ops.get_engine_type_and_version.return_value = "direct.1.1"
self._create_object_fun = mock.Mock()
data = {
}
self._assert_cluster_scaling_validation_v2(
data=data,
bad_req_i=(1, 'VALIDATION_ERROR',
u'{} is not valid under any of the given schemas')
)
data = {
'resize_node_groups': [{}]
}
self._assert_cluster_scaling_validation_v2(
data=data,
bad_req_i=(1, 'VALIDATION_ERROR',
u"resize_node_groups[0]: 'name' is a required property")
)
data = {
'resize_node_groups': [
{
'name': 'a'
}
]
}
self._assert_cluster_scaling_validation_v2(
data=data,
bad_req_i=(1, 'VALIDATION_ERROR',
u"resize_node_groups[0]: 'count' is a required "
u"property")
)
data = {
'resize_node_groups': [
{
'name': 'a',
'flavor_id': '42',
'instances': ['id1'],
'count': 2
}
]
}
self._assert_cluster_scaling_validation_v2(
data=data,
bad_req_i=(1, 'VALIDATION_ERROR',
u"resize_node_groups[0]: Additional properties are not "
u"allowed ('flavor_id' was unexpected)")
)
@mock.patch("sahara.service.api.OPS")
def test_cluster_scaling_validation_add_ng(self, ops):
ops.get_engine_type_and_version.return_value = "direct.1.1"