Added methods for patching deployment info per object

Instead of wide callbacks process_deployment or process_provision
implemented methods to patch deployemnt or provision info per
cluster or node.
The code of extensions and tests was updated accordingly.
Also added helper to mark methods of extension as deprecated.
the extension load behaviour was modified, instead of fail operation
when extension cannot be loaded, the nailgun only write error in log
that extension is not loaded and continue operation.

Partial-Bug: 1596987
Change-Id: I577c8ffc105734e12646ca7c6a4fe4927e70b119
DocImpact
This commit is contained in:
Bulat Gaifullin 2016-07-13 20:09:39 +03:00
parent 55164a0543
commit 5167527dc4
15 changed files with 350 additions and 465 deletions

View File

@ -37,8 +37,12 @@ from nailgun.extensions.manager import \
from nailgun.extensions.manager import \
fire_callback_on_cluster_patch_attributes
from nailgun.extensions.manager import \
fire_callback_on_deployment_data_serialization
fire_callback_on_cluster_serialization_for_deployment
from nailgun.extensions.manager import \
fire_callback_on_provisioning_data_serialization
fire_callback_on_node_serialization_for_deployment
from nailgun.extensions.manager import \
fire_callback_on_cluster_serialization_for_provisioning
from nailgun.extensions.manager import \
fire_callback_on_node_serialization_for_provisioning
from nailgun.extensions.manager import node_extension_call
from nailgun.extensions.manager import setup_yaql_context

View File

@ -14,30 +14,108 @@
# License for the specific language governing permissions and limitations
# under the License.
"Contains base class for Nailgun extensions"
# Contains base class for Nailgun extensions
import abc
import six
class DeprecationController(type):
@staticmethod
def mark_as_deprecated(obj, details):
obj.__deprecated__ = details
@staticmethod
def is_deprecated(obj):
return hasattr(obj, '__deprecated__')
def __new__(mcls, name, bases, namespace):
cls = super(DeprecationController, mcls).__new__(
mcls, name, bases, namespace
)
deprecated_methods = set()
for base in bases:
for name in dir(base):
value = getattr(base, name, None)
# if method from base class has attribute '__deprectated__'
# and method from current class does not have, that means
# method was overridden in current class and it needs
# to inform user that method was removed
if (
DeprecationController.is_deprecated(value) and
not DeprecationController.is_deprecated(getattr(cls, name))
):
deprecated_methods.add((base, value))
if deprecated_methods:
raise RuntimeError(
'\n'.join(
"{0}.{1} was removed. {2}.".format(
c.__name__, m.__name__, m.__deprecated__
) for c, m in deprecated_methods
)
)
return cls
def deprecated(instructions):
def wrapper(func):
DeprecationController.mark_as_deprecated(func, instructions)
return func
return wrapper
@six.add_metaclass(DeprecationController)
class BasePipeline(object):
@classmethod
def process_deployment_for_cluster(cls, cluster, cluster_data):
"""Extend or modify deployment data for cluster.
:param cluster_data: serialized data for cluster
:param cluster: the instance of Cluster
"""
@classmethod
def process_deployment_for_node(cls, node, node_data):
"""Extend or modify deployment data for node.
:param node_data: serialized data for node
:param node: the instance of Node
"""
@classmethod
def process_provisioning_for_cluster(cls, cluster, cluster_data):
"""Extend or modify provisioning data for cluster.
:param cluster: the instance of Cluster
:param cluster_data: serialized data for cluster
"""
@classmethod
def process_provisioning_for_node(cls, node, node_data):
"""Extend or modify provisioning data for node.
:param node: the instance of Node
:param node_data: serialized data for node
"""
@classmethod
@deprecated("Please implement methods process_deployment_for_* instead")
def process_deployment(cls, deployment_data, cluster, nodes, **kwargs):
"""Change the deployment_data.
:param deployment_data: serialized data
"""
return deployment_data
@classmethod
@deprecated("Please implement methods process_provisioning_for_* instead")
def process_provisioning(cls, provisioning_data, cluster, nodes, **kwargs):
"""Change the provisioning_data.
:param provisioning_data: serialized data
"""
return provisioning_data
@six.add_metaclass(abc.ABCMeta)
@ -158,7 +236,7 @@ class BaseExtension(object):
"""Callback which gets executed when cluster is deleted"""
@classmethod
def on_before_deployment_check(cls, cluster):
def on_before_deployment_check(cls, cluster, nodes):
"""Callback which gets executed when "before deployment check" runs"""
@classmethod

View File

@ -21,16 +21,10 @@ from stevedore.extension import ExtensionManager
from nailgun import errors
from nailgun.extensions import consts
from nailgun.logger import logger
_EXTENSION_MANAGER = None
def on_load_failure(manager, endpoint, exc):
logger.exception("Failed to load %s extension", endpoint.name)
raise
def get_all_extensions():
"""Retrieves all available extensions for Nailgun
@ -43,8 +37,8 @@ def get_all_extensions():
if _EXTENSION_MANAGER is None:
_EXTENSION_MANAGER = ExtensionManager(
on_load_failure_callback=on_load_failure,
namespace=consts.EXTENSIONS_NAMESPACE)
namespace=consts.EXTENSIONS_NAMESPACE
)
return (ext.plugin for ext in _EXTENSION_MANAGER.extensions)
@ -172,9 +166,9 @@ def fire_callback_on_cluster_delete(cluster):
extension.on_cluster_delete(cluster)
def fire_callback_on_before_deployment_check(cluster):
def fire_callback_on_before_deployment_check(cluster, nodes=None):
for extension in get_all_extensions():
extension.on_before_deployment_check(cluster)
extension.on_before_deployment_check(cluster, nodes or cluster.nodes)
def fire_callback_on_before_deployment_serialization(cluster, nodes,
@ -199,17 +193,21 @@ def _collect_data_pipelines_for_cluster(cluster):
if e.name in extensions)
def fire_callback_on_deployment_data_serialization(data, cluster, nodes,
**kwargs):
def fire_callback_on_node_serialization_for_deployment(node, node_data):
for pipeline in _collect_data_pipelines_for_cluster(node.cluster):
pipeline.process_deployment_for_node(node, node_data)
def fire_callback_on_node_serialization_for_provisioning(node, node_data):
for pipeline in _collect_data_pipelines_for_cluster(node.cluster):
pipeline.process_provisioning_for_node(node, node_data)
def fire_callback_on_cluster_serialization_for_deployment(cluster, data):
for pipeline in _collect_data_pipelines_for_cluster(cluster):
data = pipeline.process_deployment(data, cluster, nodes, **kwargs)
return data
pipeline.process_deployment_for_cluster(cluster, data)
def fire_callback_on_provisioning_data_serialization(data, cluster, nodes,
**kwargs):
def fire_callback_on_cluster_serialization_for_provisioning(cluster, data):
for pipeline in _collect_data_pipelines_for_cluster(cluster):
data = pipeline.process_provisioning(data, cluster, nodes, **kwargs)
return data
pipeline.process_provisioning_for_cluster(cluster, data)

View File

@ -42,7 +42,7 @@ class UpdateDnsmasqTask(object):
'parameters': {
'provisioning_info':
provisioning_serializers.ProvisioningSerializer.
serialize_cluster_info(None, None)
serialize_cluster(None, None)
}
}]
}

View File

@ -67,106 +67,66 @@ class VolumeObjectMethodsMixin(object):
class NodeVolumesPipeline(VolumeObjectMethodsMixin, BasePipeline):
@classmethod
def process_provisioning(cls, provisioning_data, cluster, nodes, **kwargs):
nodes_db = {node.id: node for node in nodes}
for node in provisioning_data['nodes']:
volumes = cls.get_node_volumes(nodes_db[int(node['uid'])])
node['ks_meta']['pm_data']['ks_spaces'] = volumes
return provisioning_data
MIN_SUPPORTED_VERSION = StrictVersion("8.0")
@classmethod
def process_deployment(cls, deployment_data, cluster, nodes, **kwargs):
if (StrictVersion(cluster.release.environment_version) >=
StrictVersion('8.0')):
def process_provisioning_for_node(cls, node, node_data):
"""Adds node volumes to provision info."""
ks_meta = node_data.setdefault('ks_meta', {})
pm_data = ks_meta.setdefault('pm_data', {})
pm_data['ks_spaces'] = cls.get_node_volumes(node) or []
nodes_wo_master = six.moves.filter(
lambda n: n['uid'] != consts.MASTER_NODE_UID,
deployment_data)
nodes_dict = {int(node['uid']): node for node in nodes_wo_master}
for node in nodes:
volumes = cls.get_node_volumes(node)
nodes_dict[node.id]['node_volumes'] = volumes
return deployment_data
@classmethod
def process_deployment_for_node(cls, node, node_data):
"""Adds node volumes to deployment info."""
version = StrictVersion(node.cluster.release.environment_version)
if version >= cls.MIN_SUPPORTED_VERSION:
node_data['node_volumes'] = cls.get_node_volumes(node) or []
class PgCountPipeline(VolumeObjectMethodsMixin, BasePipeline):
@classmethod
def process_deployment(cls, deployment_data, cluster, nodes, **kwargs):
cls._set_pg_count_storage_parameters(deployment_data, nodes)
return deployment_data
@classmethod
def _set_pg_count_storage_parameters(cls, data, nodes):
"""Generate pg_num
pg_num is generated as the number of OSDs across the cluster
multiplied by 100, divided by Ceph replication factor, and
rounded up to the nearest power of 2.
"""
def process_deployment_for_cluster(cls, cluster, cluster_data):
"""Added ceph related information to deployment info for cluster."""
all_nodes = {n.uid: n for n in cluster.nodes}
osd_num = 0
osd_nodes = [node for node in nodes
if 'ceph-osd' in node.all_roles]
for n in cluster_data['nodes']:
if 'ceph-osd' in (n.get('roles') or [n.get('role')]):
volumes = cls.get_node_volumes(all_nodes[n['uid']]) or []
for volume in volumes:
for part in volume.get('volumes', []):
if (part.get('name') == 'ceph' and
part.get('size', 0) > 0):
osd_num += 1
for node in osd_nodes:
for disk in cls.get_node_volumes(node):
for part in disk.get('volumes', []):
if part.get('name') == 'ceph' and part.get('size', 0) > 0:
osd_num += 1
storage_attrs = cluster_data.setdefault('storage', {})
pg_counts = get_pool_pg_count(
osd_num=osd_num,
pool_sz=int(storage_attrs['osd_pool_size']),
ceph_version='firefly',
volumes_ceph=storage_attrs['volumes_ceph'],
objects_ceph=storage_attrs['objects_ceph'],
ephemeral_ceph=storage_attrs['ephemeral_ceph'],
images_ceph=storage_attrs['images_ceph'],
emulate_pre_7_0=False)
for node in data:
storage_attrs = node['storage']
pg_counts = get_pool_pg_count(
osd_num=osd_num,
pool_sz=int(storage_attrs['osd_pool_size']),
ceph_version='firefly',
volumes_ceph=storage_attrs['volumes_ceph'],
objects_ceph=storage_attrs['objects_ceph'],
ephemeral_ceph=storage_attrs['ephemeral_ceph'],
images_ceph=storage_attrs['images_ceph'],
emulate_pre_7_0=False)
# Log {pool_name: pg_count} mapping
pg_str = ", ".join(map("{0[0]}={0[1]}".format, pg_counts.items()))
logger.debug("Ceph: PG values {%s}", pg_str)
storage_attrs['pg_num'] = pg_counts['default_pg_num']
storage_attrs['per_pool_pg_nums'] = pg_counts
# Log {pool_name: pg_count} mapping
pg_str = ", ".join(map("{0[0]}={0[1]}".format, pg_counts.items()))
logger.debug("Ceph: PG values {%s}", pg_str)
storage_attrs['pg_num'] = pg_counts['default_pg_num']
storage_attrs['per_pool_pg_nums'] = pg_counts
class SetImageCacheMaxSizePipeline(VolumeObjectMethodsMixin, BasePipeline):
@classmethod
def process_deployment(cls, deployment_data, cluster, nodes, **kwargs):
nodes_wo_master = six.moves.filter(
lambda n: n['uid'] != consts.MASTER_NODE_UID,
deployment_data)
cls._set_image_cache_max_size(nodes_wo_master, cluster, nodes)
return deployment_data
@classmethod
def _set_image_cache_max_size(cls, data, cluster, nodes):
nodes_db = {node.id: node for node in nodes}
editable_attrs = objects.Cluster.get_editable_attributes(cluster)
images_ceph = editable_attrs['storage']['images_ceph']['value']
for node in data:
if images_ceph:
image_cache_max_size = '0'
else:
volumes = cls.get_node_volumes(nodes_db[int(node['uid'])])
image_cache_max_size = calc_glance_cache_size(volumes)
node.setdefault(
'glance', {})['image_cache_max_size'] = image_cache_max_size
def process_deployment_for_node(cls, node, node_data):
"""Added glance related information to deployment info for node."""
volumes = cls.get_node_volumes(node)
image_cache_max_size = calc_glance_cache_size(volumes)
glance = node_data.setdefault('glance', {})
glance['image_cache_max_size'] = image_cache_max_size
class VolumeManagerExtension(VolumeObjectMethodsMixin, BaseExtension):
@ -217,9 +177,9 @@ class VolumeManagerExtension(VolumeObjectMethodsMixin, BaseExtension):
VolumeObject.delete_by_node_ids(node_ids)
@classmethod
def on_before_deployment_check(cls, cluster):
cls._check_disks(cluster)
cls._check_volumes(cluster)
def on_before_deployment_check(cls, cluster, nodes):
cls._check_disks(nodes)
cls._check_volumes(nodes)
@classmethod
def _is_disk_checking_required(cls, node):
@ -238,23 +198,28 @@ class VolumeManagerExtension(VolumeObjectMethodsMixin, BaseExtension):
return True
@classmethod
def _check_disks(cls, cluster):
try:
for node in cluster.nodes:
if cls._is_disk_checking_required(node):
VolumeManager(node).check_disk_space_for_deployment()
except errors.NotEnoughFreeSpace:
raise errors.NotEnoughFreeSpace(
u"Node '{}' has insufficient disk space".format(
node.human_readable_name))
def _check_disks(cls, nodes):
cls._check_spaces(
nodes, lambda vm: vm.check_disk_space_for_deployment()
)
@classmethod
def _check_volumes(cls, cluster):
try:
for node in cluster.nodes:
if cls._is_disk_checking_required(node):
VolumeManager(node).check_volume_sizes_for_deployment()
except errors.NotEnoughFreeSpace as e:
raise errors.NotEnoughFreeSpace(
u"Node '{}' has insufficient disk space\n{}".format(
node.human_readable_name, e.message))
def _check_volumes(cls, nodes):
cls._check_spaces(
nodes, lambda vm: vm.check_volume_sizes_for_deployment()
)
@classmethod
def _check_spaces(cls, nodes, checker):
messages = []
for node in nodes:
if cls._is_disk_checking_required(node):
try:
checker(VolumeManager(node))
except errors.NotEnoughFreeSpace:
messages.append(
u"Node '{}' has insufficient disk space"
.format(node.human_readable_name)
)
if messages:
raise errors.NotEnoughFreeSpace(u'\n'.join(messages))

View File

@ -161,11 +161,13 @@ def calc_glance_cache_size(volumes):
Based on formula:
10%*(/var/lib/glance) if > 5GB else 5GB
"""
cache_size_form = lambda size: int(0.1 * mb_to_byte(size))
cache_min_size = gb_to_byte(5)
glance_mount_size = find_size_by_name(volumes, 'glance', 'image')
cache_size = cache_size_form(glance_mount_size)
return str(cache_size if cache_size > cache_min_size else cache_min_size)
if glance_mount_size == 0:
return '0'
cache_size = int(0.1 * mb_to_byte(glance_mount_size))
cache_min_size = gb_to_byte(5)
return str(max(cache_size, cache_min_size))
def get_logical_volumes_by_name(volumes, name, id_type):

View File

@ -67,14 +67,14 @@ class TestCheckBeforeDeploymentCallback(BaseTestCase):
with mock.patch.object(
VolumeManager,
'check_disk_space_for_deployment') as check_mock:
fire_callback_on_before_deployment_check(self.cluster)
fire_callback_on_before_deployment_check(self.cluster, [self.node])
self.assertFalse(check_mock.called)
with mock.patch.object(
VolumeManager,
'check_volume_sizes_for_deployment') as check_mock:
fire_callback_on_before_deployment_check(self.cluster)
fire_callback_on_before_deployment_check(self.cluster, [self.node])
self.assertFalse(check_mock.called)
@ -84,13 +84,13 @@ class TestCheckBeforeDeploymentCallback(BaseTestCase):
with mock.patch.object(
VolumeManager,
'check_disk_space_for_deployment') as check_mock:
fire_callback_on_before_deployment_check(self.cluster)
fire_callback_on_before_deployment_check(self.cluster, [self.node])
self.assertEqual(check_mock.call_count, 1)
with mock.patch.object(
VolumeManager,
'check_volume_sizes_for_deployment') as check_mock:
fire_callback_on_before_deployment_check(self.cluster)
fire_callback_on_before_deployment_check(self.cluster, [self.node])
self.assertEqual(check_mock.call_count, 1)

View File

@ -22,8 +22,7 @@ from distutils.version import StrictVersion
import six
from nailgun import consts
from nailgun.extensions import fire_callback_on_before_deployment_serialization
from nailgun.extensions import fire_callback_on_deployment_data_serialization
from nailgun import extensions
from nailgun.logger import logger
from nailgun import objects
from nailgun.plugins import adapters
@ -72,6 +71,11 @@ class DeploymentMultinodeSerializer(object):
"""Method generates facts which are passed to puppet."""
try:
self.initialize(cluster)
common_attrs = self.get_common_attrs(cluster)
extensions.fire_callback_on_cluster_serialization_for_deployment(
cluster, common_attrs
)
serialized_nodes = []
origin_nodes = []
@ -86,10 +90,10 @@ class DeploymentMultinodeSerializer(object):
origin_nodes.append(node)
serialized_nodes.extend(
self.serialize_generated(cluster, origin_nodes)
self.serialize_generated(common_attrs, origin_nodes)
)
serialized_nodes.extend(
self.serialize_customized(cluster, customized_nodes)
self.serialize_customized(common_attrs, customized_nodes)
)
# NOTE(dshulyak) tasks should not be preserved from replaced
@ -102,15 +106,21 @@ class DeploymentMultinodeSerializer(object):
return serialized_nodes
def serialize_generated(self, cluster, nodes):
nodes = self.serialize_nodes(nodes)
common_attrs = self.get_common_attrs(cluster)
def serialize_generated(self, common_attrs, nodes):
serialized_nodes = self.serialize_nodes(common_attrs, nodes)
nodes_map = {n.uid: n for n in nodes}
self.set_deployment_priorities(nodes)
for node in nodes:
yield utils.dict_merge(node, common_attrs)
self.set_deployment_priorities(serialized_nodes)
for node_data in serialized_nodes:
# the serialized nodes may contain fake nodes like master node
# which does not have related db object. it shall be excluded.
if node_data['uid'] in nodes_map:
extensions.fire_callback_on_node_serialization_for_deployment(
nodes_map[node_data['uid']], node_data
)
yield utils.dict_merge(common_attrs, node_data)
def serialize_customized(self, cluster, nodes):
def serialize_customized(self, common_attrs, nodes):
for node in nodes:
for role_data in node.replaced_deployment_info:
yield role_data
@ -175,7 +185,7 @@ class DeploymentMultinodeSerializer(object):
def not_roles(self, nodes, roles):
return filter(lambda node: node['role'] not in roles, nodes)
def serialize_nodes(self, nodes):
def serialize_nodes(self, common_attrs, nodes):
"""Serialize node for each role.
For example if node has two roles then
@ -185,10 +195,12 @@ class DeploymentMultinodeSerializer(object):
serialized_nodes = []
for node in nodes:
for role in objects.Node.all_roles(node):
serialized_nodes.append(self.serialize_node(node, role))
serialized_nodes.append(
self.serialize_node(common_attrs, node, role)
)
return serialized_nodes
def serialize_node(self, node, role):
def serialize_node(self, common_attrs, node, role):
"""Serialize node, then it will be merged with common attributes."""
node_attrs = {
# Yes, uid is really should be a string
@ -408,9 +420,9 @@ class DeploymentMultinodeSerializer61(DeploymentMultinodeSerializer,
neutron_network_serializer = \
neutron_serializers.NeutronNetworkDeploymentSerializer61
def serialize_node(self, node, role):
serialized_node = super(
DeploymentMultinodeSerializer61, self).serialize_node(node, role)
def serialize_node(self, common_attrs, node, role):
base = super(DeploymentMultinodeSerializer61, self)
serialized_node = base.serialize_node(common_attrs, node, role)
serialized_node['user_node_name'] = node.name
serialized_node.update(self.generate_vmware_data(node))
@ -433,9 +445,9 @@ class DeploymentHASerializer61(DeploymentHASerializer,
neutron_network_serializer = \
neutron_serializers.NeutronNetworkDeploymentSerializer61
def serialize_node(self, node, role):
serialized_node = super(
DeploymentHASerializer61, self).serialize_node(node, role)
def serialize_node(self, common_attrs, node, role):
base = super(DeploymentHASerializer61, self)
serialized_node = base.serialize_node(common_attrs, node, role)
serialized_node['user_node_name'] = node.name
serialized_node.update(self.generate_vmware_data(node))
@ -539,10 +551,6 @@ class DeploymentHASerializer90(DeploymentHASerializer80):
node_attrs['nova_hugepages_enabled'] = (
objects.NodeAttributes.is_nova_hugepages_enabled(node))
# we don't need nodes in serialized data for 9.0 environments
# https://bugs.launchpad.net/fuel/+bug/1531128
attrs.pop('nodes')
return attrs
@classmethod
@ -552,9 +560,9 @@ class DeploymentHASerializer90(DeploymentHASerializer80):
else:
return neutron_serializers.NeutronNetworkDeploymentSerializer90
def serialize_node(self, node, role):
serialized_node = super(
DeploymentHASerializer90, self).serialize_node(node, role)
def serialize_node(self, common_attrs, node, role):
base = super(DeploymentHASerializer90, self)
serialized_node = base.serialize_node(common_attrs, node, role)
self.serialize_node_attributes(node, serialized_node)
return serialized_node
@ -668,7 +676,7 @@ class DeploymentLCMSerializer(DeploymentHASerializer90):
attrs['release'] = objects.Release.to_dict(cluster.release)
return attrs
def serialize_customized(self, cluster, nodes):
def serialize_customized(self, common_attrs, nodes):
for node in nodes:
data = {}
roles = []
@ -678,17 +686,19 @@ class DeploymentLCMSerializer(DeploymentHASerializer90):
# of old serialized info, the old info
# have serialized data per role
roles.append(role_data.pop('role'))
data = utils.dict_merge(data, role_data)
utils.dict_update(data, role_data)
if roles:
data['roles'] = roles
yield data
def serialize_nodes(self, nodes):
def serialize_nodes(self, common_attrs, nodes):
serialized_nodes = []
for node in nodes:
roles = objects.Node.all_roles(node)
if roles:
serialized_nodes.append(self.serialize_node(node, roles))
serialized_nodes.append(
self.serialize_node(common_attrs, node, roles)
)
# added master node
serialized_nodes.append({
'uid': consts.MASTER_NODE_UID,
@ -696,12 +706,12 @@ class DeploymentLCMSerializer(DeploymentHASerializer90):
})
return serialized_nodes
def serialize_node(self, node, roles):
def serialize_node(self, common_attrs, node, roles):
# serialize all roles to one config
# Since there is no role depended things except
# OpenStack configs, we can do this
serialized_node = super(
DeploymentLCMSerializer, self).serialize_node(node, roles[0])
base = super(DeploymentLCMSerializer, self)
serialized_node = base.serialize_node(common_attrs, node, roles[0])
del serialized_node['role']
serialized_node['roles'] = roles
serialized_node['fail_if_error'] = bool(
@ -814,42 +824,15 @@ def get_serializer_for_cluster(cluster):
return serializers_map[latest_version][env_mode]
def _execute_pipeline(data, cluster, nodes, ignore_customized):
"Executes pipelines depending on ignore_customized boolean."
if ignore_customized:
return fire_callback_on_deployment_data_serialization(
data, cluster, nodes)
nodes_without_customized = {n.uid: n for n in nodes
if not n.replaced_deployment_info}
def keyfunc(node):
return node['uid'] in nodes_without_customized
# not customized nodes
nodes_data_for_pipeline = list(six.moves.filter(keyfunc, data))
# NOTE(sbrzeczkowski): pipelines must be executed for nodes
# which don't have replaced_deployment_info specified
updated_data = fire_callback_on_deployment_data_serialization(
nodes_data_for_pipeline, cluster,
list(six.itervalues(nodes_without_customized)))
# customized nodes
updated_data.extend(six.moves.filterfalse(keyfunc, data))
return updated_data
def _invoke_serializer(serializer, cluster, nodes, ignore_customized):
fire_callback_on_before_deployment_serialization(
extensions.fire_callback_on_before_deployment_serialization(
cluster, cluster.nodes, ignore_customized
)
objects.Cluster.set_primary_roles(cluster, nodes)
data = serializer.serialize(
return serializer.serialize(
cluster, nodes, ignore_customized=ignore_customized
)
return _execute_pipeline(data, cluster, nodes, ignore_customized)
def serialize(orchestrator_graph, cluster, nodes, ignore_customized=False):

View File

@ -22,9 +22,7 @@ import netaddr
import six
from nailgun import consts
from nailgun.extensions import \
fire_callback_on_before_provisioning_serialization
from nailgun.extensions import fire_callback_on_provisioning_data_serialization
from nailgun import extensions
from nailgun.logger import logger
from nailgun import objects
from nailgun.orchestrator.base_serializers import MellanoxMixin
@ -54,25 +52,48 @@ class ProvisioningSerializer(MellanoxMixin):
serialized_nodes.extend(
cls.serialize_nodes(cluster_attrs, node_group))
serialized_info = (cluster.replaced_provisioning_info or
cls.serialize_cluster_info(cluster_attrs, nodes))
serialized_info['fault_tolerance'] = cls.fault_tolerance(cluster,
nodes)
if cluster.replaced_provisioning_info:
serialized_info = cluster.replaced_provisioning_info
else:
serialized_info = cls.serialize_cluster_info(
cluster, cluster_attrs
)
serialized_info['fault_tolerance'] = cls.fault_tolerance(
cluster, nodes
)
serialized_info['nodes'] = serialized_nodes
return serialized_info
@classmethod
def serialize_cluster_info(cls, cluster_attrs, nodes):
def serialize_node_info(cls, cluster_attrs, node):
data = cls.serialize_node(cluster_attrs, node)
extensions.fire_callback_on_node_serialization_for_provisioning(
node, data
)
return data
@classmethod
def serialize_cluster_info(cls, cluster, cluster_attrs):
data = cls.serialize_cluster(cluster, cluster_attrs)
extensions.fire_callback_on_cluster_serialization_for_provisioning(
cluster, data
)
return data
@classmethod
def serialize_cluster(cls, cluster, cluster_attrs):
return {
'engine': {
'url': settings.COBBLER_URL,
'username': settings.COBBLER_USER,
'password': settings.COBBLER_PASSWORD,
'master_ip': settings.MASTER_IP,
}}
}
}
@classmethod
def serialize_customized(self, nodes):
def serialize_customized(cls, nodes):
serialized = []
for node in nodes:
serialized.append(node.replaced_provisioning_info)
@ -83,7 +104,8 @@ class ProvisioningSerializer(MellanoxMixin):
"""Serialize nodes."""
serialized_nodes = []
for node in nodes:
serialized_nodes.append(cls.serialize_node(cluster_attrs, node))
node_data = cls.serialize_node_info(cluster_attrs, node)
serialized_nodes.append(node_data)
return serialized_nodes
@classmethod
@ -352,46 +374,17 @@ def get_serializer_for_cluster(cluster):
return ProvisioningSerializer90
def _execute_pipeline(data, cluster, nodes, ignore_customized):
"Executes pipelines depending on ignore_customized boolean."
if ignore_customized:
return fire_callback_on_provisioning_data_serialization(
data, cluster, nodes)
nodes_without_customized = {n.uid: n for n in nodes
if not n.replaced_provisioning_info}
def keyfunc(node):
return node['uid'] in nodes_without_customized
temp_nodes = data['nodes']
# not customized nodes
data['nodes'] = list(six.moves.filter(keyfunc, temp_nodes))
# NOTE(sbrzeczkowski): pipelines must be executed for nodes
# which don't have replaced_provisioning_info specified
updated_data = fire_callback_on_provisioning_data_serialization(
data, cluster, list(six.itervalues(nodes_without_customized)))
# customized nodes
updated_data['nodes'].extend(six.moves.filterfalse(keyfunc, temp_nodes))
return updated_data
def serialize(cluster, nodes, ignore_customized=False):
"""Serialize cluster for provisioning."""
fire_callback_on_before_provisioning_serialization(
extensions.fire_callback_on_before_provisioning_serialization(
cluster, nodes, ignore_customized
)
serializer = get_serializer_for_cluster(cluster)
data = serializer.serialize(
cluster, nodes, ignore_customized=ignore_customized)
return _execute_pipeline(data, cluster, nodes, ignore_customized)
return serializer.serialize(
cluster, nodes, ignore_customized=ignore_customized
)
class ProvisioningSerializer70(ProvisioningSerializer61):

View File

@ -1469,7 +1469,9 @@ class CheckBeforeDeploymentTask(object):
@classmethod
def execute(cls, task):
fire_callback_on_before_deployment_check(task.cluster)
fire_callback_on_before_deployment_check(
task.cluster, TaskHelper.nodes_to_deploy(task.cluster)
)
cls._check_nodes_are_online(task)
cls._check_nodes_roles(task)

View File

@ -77,6 +77,7 @@ class OrchestratorSerializerTestBase(BaseSerializerTest):
self.cluster_mock.id = 0
self.cluster_mock.deployment_tasks = []
self.cluster_mock.release.deployment_tasks = []
self.common_attrs = mock.MagicMock()
def filter_by_role(self, nodes, role):
return filter(lambda node: role in node['role'], nodes)
@ -213,7 +214,9 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
self.assert_nodes_with_role(nodes, 'mongo', 1)
def test_serialize_nodes(self):
serialized_nodes = self.serializer.serialize_nodes(self.cluster.nodes)
serialized_nodes = self.serializer.serialize_nodes(
self.common_attrs, self.cluster.nodes
)
self.assert_roles_flattened(serialized_nodes)
# Each not should be same as result of
@ -222,7 +225,8 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
node_db = self.db.query(Node).get(int(serialized_node['uid']))
expected_node = self.serializer.serialize_node(
node_db, serialized_node['role'])
self.common_attrs, node_db, serialized_node['role']
)
self.assertEqual(serialized_node, expected_node)
def test_serialize_node(self):
@ -234,7 +238,9 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
node_db = self.db.query(Node).get(node['id'])
serialized_data = self.serializer.serialize_node(node_db, 'controller')
serialized_data = self.serializer.serialize_node(
self.common_attrs, node_db, 'controller'
)
self.assertEqual(serialized_data['role'], 'controller')
self.assertEqual(serialized_data['uid'], str(node_db.id))
@ -254,7 +260,9 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
vms_conf = [{'id': 1, 'cluster_id': self.cluster.id}]
node_db.vms_conf = vms_conf
serialized_data = self.serializer.serialize_node(node_db, 'controller')
serialized_data = self.serializer.serialize_node(
self.common_attrs, node_db, 'controller'
)
self.assertEqual(serialized_data['vms_conf'], vms_conf)
def test_node_list(self):
@ -426,7 +434,9 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
self.cluster_mock.release.environment_version = '5.0'
serializer = DeploymentMultinodeSerializer(
AstuteGraph(self.cluster_mock))
serialized_nodes = serializer.serialize_nodes(self.cluster.nodes)
serialized_nodes = serializer.serialize_nodes(
self.common_attrs, self.cluster.nodes
)
# primary-contoller is not critical for MultiNode serializer
expected_ciritial_roles = [
{'fail_if_error': False, 'role': 'cinder'},
@ -1306,7 +1316,9 @@ class TestNovaOrchestratorHASerializer(OrchestratorSerializerTestBase):
self.assertEqual(expected_priorities, nodes)
def test_set_critital_node(self):
serialized_nodes = self.serializer.serialize_nodes(self.cluster.nodes)
serialized_nodes = self.serializer.serialize_nodes(
self.common_attrs, self.cluster.nodes
)
expected_ciritial_roles = [
{'fail_if_error': True, 'role': 'primary-controller'},
{'fail_if_error': True, 'role': 'controller'},
@ -1508,7 +1520,9 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
)
def test_serialize_nodes(self):
serialized_nodes = self.serializer.serialize_nodes(self.cluster.nodes)
serialized_nodes = self.serializer.serialize_nodes(
self.common_attrs, self.cluster.nodes
)
self.assert_roles_flattened(serialized_nodes)
# Each not should be same as result of
@ -1517,7 +1531,8 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
node_db = self.db.query(Node).get(int(serialized_node['uid']))
expected_node = self.serializer.serialize_node(
node_db, serialized_node['role'])
self.common_attrs, node_db, serialized_node['role']
)
self.assertEqual(serialized_node, expected_node)
def test_neutron_vlan_ids_tag_present_on_6_0_env(self):
@ -1586,7 +1601,9 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
objects.Cluster.prepare_for_deployment(self.cluster)
node_db = self.db.query(Node).get(node['id'])
serialized_data = self.serializer.serialize_node(node_db, 'controller')
serialized_data = self.serializer.serialize_node(
self.common_attrs, node_db, 'controller'
)
self.assertEqual(serialized_data['role'], 'controller')
self.assertEqual(serialized_data['uid'], str(node_db.id))
@ -2304,8 +2321,12 @@ class TestMongoNodesSerialization(OrchestratorSerializerTestBase):
def test_mongo_roles_equals_in_defferent_modes(self):
cluster = self.create_env()
ha_nodes = self.serializer_ha.serialize_nodes(cluster.nodes)
mn_nodes = self.serializer_mn.serialize_nodes(cluster.nodes)
ha_nodes = self.serializer_ha.serialize_nodes(
self.common_attrs, cluster.nodes
)
mn_nodes = self.serializer_mn.serialize_nodes(
self.common_attrs, cluster.nodes
)
self.assertEqual(mn_nodes, ha_nodes)
@ -2367,6 +2388,10 @@ class BaseDeploymentSerializer(BaseSerializerTest):
serializer = None
env_version = '2014.2-6.1'
def setUp(self):
super(BaseDeploymentSerializer, self).setUp()
self.common_attrs = mock.MagicMock()
def create_env(self, mode):
if mode == consts.CLUSTER_MODES.multinode:
available_modes = [consts.CLUSTER_MODES.ha_compact,
@ -2393,7 +2418,8 @@ class BaseDeploymentSerializer(BaseSerializerTest):
def check_serialize_node(self):
self.assertEqual(
self.serializer.serialize_node(
self.env.nodes[0], 'role')['user_node_name'],
self.common_attrs, self.env.nodes[0], 'role'
)['user_node_name'],
self.node_name)
def check_serialize_node_for_node_list(self):
@ -2459,7 +2485,8 @@ class BaseDeploymentSerializer(BaseSerializerTest):
self.db.flush()
result = self.serializer.serialize_node(
self.env.nodes[0], 'controller')
self.common_attrs, self.env.nodes[0], 'controller'
)
self.assertEqual(len(result['vcenter']['computes']), 4)
@ -2684,6 +2711,7 @@ class TestSerializeInterfaceDriversData(base.BaseIntegrationTest):
def setUp(self):
super(TestSerializeInterfaceDriversData, self).setUp()
self.common_attrs = mock.MagicMock()
def _create_cluster_for_interfaces(self, driver_mapping={},
bus_mapping={},
@ -2724,8 +2752,9 @@ class TestSerializeInterfaceDriversData(base.BaseIntegrationTest):
self._create_cluster_for_interfaces(driver_mapping, bus_mapping)
self.db.commit()
cluster_db = self.db.query(Cluster).get(cluster['id'])
node = self.serializer.serialize_node(cluster_db.nodes[0],
'controller')
node = self.serializer.serialize_node(
self.common_attrs, cluster_db.nodes[0], 'controller'
)
interfaces = node['network_scheme']['interfaces']
for iface, iface_attrs in interfaces.items():
self.assertIn('vendor_specific', iface_attrs)
@ -2758,8 +2787,9 @@ class TestSerializeInterfaceDriversData(base.BaseIntegrationTest):
self.db.commit()
cluster_db = self.db.query(Cluster).get(cluster['id'])
node = self.serializer.serialize_node(cluster_db.nodes[0],
'controller')
node = self.serializer.serialize_node(
self.common_attrs, cluster_db.nodes[0], 'controller'
)
endpoints = node['network_scheme']['endpoints']
net_roles = node['network_scheme']['roles']
for net_role, bridge in net_roles.items():

View File

@ -528,7 +528,8 @@ class TestDeploymentAttributesSerialization70(
self.check_generate_vmware_attributes_data()
result = self.serializer.serialize_node(
self.env.nodes[0], 'compute-vmware')
self.common_attrs, self.env.nodes[0], 'compute-vmware'
)
self.assertEqual(
result['vcenter']['computes'][0]['target_node'],
@ -690,7 +691,8 @@ class TestDeploymentSerializationForNovaNetwork70(
self.check_generate_vmware_attributes_data()
result = self.serializer.serialize_node(
self.env.nodes[0], 'compute-vmware')
self.common_attrs, self.env.nodes[0], 'compute-vmware'
)
self.assertEqual(
result['vcenter']['computes'][0]['target_node'],

View File

@ -654,13 +654,6 @@ class TestDeploymentHASerializer90(
for item in serialized:
self.assertIn(item, cust_serialized)
def test_remove_nodes_from_common_attrs(self):
cluster_db = self.env.clusters[0]
serializer = self.create_serializer(cluster_db)
common_attrs = serializer.get_common_attrs(cluster_db)
self.assertNotIn('nodes', common_attrs)
class TestDeploymentTasksSerialization90(
TestSerializer90Mixin,

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from nailgun import errors
@ -29,9 +28,6 @@ from nailgun.extensions import fire_callback_on_node_update
from nailgun.extensions import get_extension
from nailgun.extensions import node_extension_call
from nailgun.extensions import setup_yaql_context
from nailgun.orchestrator import deployment_serializers
from nailgun.orchestrator import orchestrator_graph
from nailgun.orchestrator import provisioning_serializers
from nailgun.test.base import BaseTestCase
@ -215,195 +211,34 @@ class TestExtensionUtils(BaseTestCase):
class TestPipeline(BaseExtensionCase):
def _create_cluster_with_extensions(self, nodes_kwargs=None):
if nodes_kwargs is None:
nodes_kwargs = [
{'roles': ['controller'], 'pending_addition': True},
]
cluster = self.env.create(
cluster_kwargs={'api': False},
nodes_kwargs=nodes_kwargs)
cluster.extensions = [self.extension.name, 'volume_manager']
self.db.flush()
return cluster
@mock.patch.object(orchestrator_graph.AstuteGraph, 'deploy_task_serialize')
def test_deployment_serialization_ignore_customized(self, _):
cluster = self._create_cluster_with_extensions()
data = [{"uid": n.uid} for n in cluster.nodes]
mserializer = mock.MagicMock()
mserializer.return_value = mock.MagicMock()
mserializer.return_value.serialize.return_value = data
with mock.patch(
'nailgun.orchestrator.deployment_serializers.'
'get_serializer_for_cluster',
return_value=mserializer):
with mock.patch('nailgun.orchestrator.deployment_serializers.'
'fire_callback_on_deployment_data_serialization'
) as mfire_callback:
replaced_data = ["it's", "something"]
with mock.patch.object(
cluster.nodes[0], 'replaced_deployment_info',
new_callable=mock.Mock(return_value=replaced_data)):
graph = orchestrator_graph.AstuteGraph(cluster)
deployment_serializers.serialize(
graph, cluster, cluster.nodes, ignore_customized=True)
mfire_callback.assert_called_once_with(data, cluster, cluster.nodes)
@mock.patch.object(orchestrator_graph.AstuteGraph, 'deploy_task_serialize')
def test_deployment_serialization_ignore_customized_false(self, _):
cluster = self._create_cluster_with_extensions(
nodes_kwargs=[
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
]
def test_cannot_instantiate_class_with_method_process_provision(self):
with self.assertRaises(RuntimeError) as ctx:
class DeprecatedPipeline(BasePipeline):
@classmethod
def process_provisioning(cls, *args, **kwargs):
pass
self.assertIn(
'Please implement methods process_provisioning_for_* instead',
str(ctx.exception)
)
data = [{"uid": n.uid} for n in cluster.nodes]
expected_data = copy.deepcopy(data[1:])
mserializer = mock.MagicMock()
mserializer.return_value = mock.MagicMock()
mserializer.return_value.serialize.return_value = data
with mock.patch(
'nailgun.orchestrator.deployment_serializers.'
'get_serializer_for_cluster',
return_value=mserializer):
with mock.patch('nailgun.orchestrator.deployment_serializers.'
'fire_callback_on_deployment_data_serialization',
) as mfire_callback:
replaced_data = ["it's", "something"]
with mock.patch.object(
cluster.nodes[0], 'replaced_deployment_info',
new_callable=mock.Mock(return_value=replaced_data)):
graph = orchestrator_graph.AstuteGraph(cluster)
deployment_serializers.serialize(
graph, cluster, cluster.nodes, ignore_customized=False)
self.assertEqual(mfire_callback.call_args[0][0], expected_data)
self.assertIs(mfire_callback.call_args[0][1], cluster)
self.assertItemsEqual(
mfire_callback.call_args[0][2], cluster.nodes[1:])
def test_provisioning_serialization_ignore_customized(self):
cluster = self._create_cluster_with_extensions()
data = {"nodes": cluster.nodes}
mserializer = mock.MagicMock()
mserializer.serialize.return_value = data
with mock.patch(
'nailgun.orchestrator.provisioning_serializers.'
'get_serializer_for_cluster',
return_value=mserializer):
with mock.patch('nailgun.orchestrator.provisioning_serializers.'
'fire_callback_on_provisioning_data_serialization'
) as mfire_callback:
replaced_data = {"it's": "something"}
with mock.patch.object(
cluster.nodes[0], 'replaced_provisioning_info',
new_callable=mock.Mock(return_value=replaced_data)):
provisioning_serializers.serialize(
cluster, cluster.nodes, ignore_customized=True)
mfire_callback.assert_called_once_with(data, cluster, cluster.nodes)
def test_provisioning_serialization_ignore_customized_false(self):
cluster = self._create_cluster_with_extensions(
nodes_kwargs=[
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller'], 'pending_addition': True},
]
def test_cannot_instantiate_class_with_method_process_deployment(self):
with self.assertRaises(RuntimeError) as ctx:
class DeprecatedPipeline(BasePipeline):
@classmethod
def process_deployment(cls, *args, **kwargs):
pass
self.assertIn(
'Please implement methods process_deployment_for_* instead',
str(ctx.exception)
)
data = {"nodes": [{"uid": n.uid} for n in cluster.nodes]}
expected_data = {"nodes": copy.deepcopy(data["nodes"][1:])}
mserializer = mock.MagicMock()
mserializer.serialize.return_value = data
with mock.patch(
'nailgun.orchestrator.provisioning_serializers.'
'get_serializer_for_cluster',
return_value=mserializer):
with mock.patch('nailgun.orchestrator.provisioning_serializers.'
'fire_callback_on_provisioning_data_serialization'
) as mfire_callback:
replaced_data = {"it's": "something"}
with mock.patch.object(
cluster.nodes[0], 'replaced_provisioning_info',
new_callable=mock.Mock(return_value=replaced_data)):
provisioning_serializers.serialize(
cluster, cluster.nodes, ignore_customized=False)
self.assertEqual(mfire_callback.call_args[0][0], expected_data)
self.assertIs(mfire_callback.call_args[0][1], cluster)
self.assertItemsEqual(
mfire_callback.call_args[0][2], cluster.nodes[1:])
def test_pipeline_change_data(self):
cluster = self.env.create(
cluster_kwargs={'api': False},
nodes_kwargs=[{'roles': ['controller'], 'pending_addition': True}]
)
cluster.extensions = [self.extension.name]
self.db.flush()
class PipelinePlus1(BasePipeline):
def test_no_error_if_no_deprecated_methods(self):
class TestPipeline(BasePipeline):
@classmethod
def process_deployment_for_cluster(cls, cluster, cluster_data):
pass
@classmethod
def process_provisioning(cls, data, cluster, nodes, **kwargs):
data['key'] += 1
return data
class PipelinePlus2(BasePipeline):
@classmethod
def process_provisioning(cls, data, cluster, nodes, **kwargs):
data['key'] += 2
return data
class Extension(BaseExtension):
name = 'ext_name'
version = '1.0.0'
description = 'ext description'
data_pipelines = (PipelinePlus1, PipelinePlus2)
extension = Extension()
cluster.extensions = [extension.name]
self.db.flush()
data = {'key': 0, 'nodes': []}
mserializer = mock.MagicMock()
mserializer.serialize.return_value = data
with mock.patch('nailgun.extensions.manager.get_all_extensions',
return_value=[extension]):
with mock.patch('nailgun.orchestrator.provisioning_serializers.'
'get_serializer_for_cluster',
return_value=mserializer):
new_data = provisioning_serializers.serialize(
cluster, cluster.nodes)
self.assertEqual(new_data['key'], 3)
def process_deployment_for_node(cls, cluster, cluster_data):
pass

View File

@ -687,7 +687,7 @@ class TestCheckBeforeDeploymentTask(BaseTestCase):
task.CheckBeforeDeploymentTask, '_check_dpdk_properties')
def test_execute_w_old_release(self, dpdk_m, sriov_m, callback_m):
task.CheckBeforeDeploymentTask.execute(self.task)
callback_m.assert_called_once_with(self.cluster)
callback_m.assert_called_once_with(self.cluster, [])
self.assertEqual(0, dpdk_m.call_count)
self.assertEqual(0, sriov_m.call_count)