Add upload configuration task

* Add configuration task manager
* Add task serializer for upload configuration task
* Add upload_config_resp rpc receiver

Change-Id: I75983277538a52b286fbdae4701f1f1630d26dfc
Implements: blueprint openstack-config-change
This commit is contained in:
Alexander Saprykin 2015-11-26 17:29:23 +01:00 committed by Sergey Slipushenko
parent e696c17968
commit b08c04ade4
13 changed files with 534 additions and 2 deletions

View File

@ -435,3 +435,5 @@ OPENSTACK_CONFIG_TYPES = Enum(
'role',
'node',
)
OVERRIDE_CONFIG_BASE_PATH = '/etc/hiera/override/configuration/'

View File

@ -57,3 +57,5 @@ from nailgun.objects.network_group import NetworkGroupCollection
from nailgun.objects.cluster_plugin_link import ClusterPluginLink
from nailgun.objects.cluster_plugin_link import ClusterPluginLinkCollection
from nailgun.objects.openstack_config import OpenstackConfig
from nailgun.objects.openstack_config import OpenstackConfigCollection

View File

@ -1143,6 +1143,27 @@ class Cluster(NailgunObject):
'1', '2'
)
@classmethod
def get_nodes_to_update_config(cls, cluster, node_id=None, node_role=None):
"""Get nodes for specified cluster that should be updated.
Configuration update can be executed for all nodes in the cluster,
or for single node, or for all nodes with specified role.
This function returns list of nodes that will be updated
according to filters that were passed.
"""
query = (
cls.get_nodes_not_for_deletion(cluster)
.filter_by(status=consts.NODE_STATUSES.ready))
if node_id:
query = query.filter_by(id=node_id)
elif node_role:
query = query.filter(
models.Node.roles.any(node_role))
return query.all()
class ClusterCollection(NailgunCollection):
"""Cluster collection."""

View File

@ -0,0 +1,127 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy import models
from nailgun.objects import NailgunCollection
from nailgun.objects import NailgunObject
from nailgun.objects.serializers.openstack_config \
import OpenstackConfigSerializer
class OpenstackConfig(NailgunObject):
model = models.OpenstackConfig
serializer = OpenstackConfigSerializer
@classmethod
def create(cls, data):
data['config_type'] = cls._get_config_type(data)
data['is_active'] = True
config = cls.find_config(**data)
if config:
cls.delete(config)
return super(OpenstackConfig, cls).create(data)
@classmethod
def delete(cls, instance):
"""Deletes configuration.
It is required to track history of previous configurations.
This why delete operation doesn't remove a record from the database,
it sets `is_active` property to False.
"""
instance.is_active = False
db().flush()
@classmethod
def _get_config_type(cls, data):
if 'node_id' in data:
return consts.OPENSTACK_CONFIG_TYPES.node
if 'node_role' in data:
return consts.OPENSTACK_CONFIG_TYPES.role
return consts.OPENSTACK_CONFIG_TYPES.cluster
@classmethod
def _find_configs_query(cls, filters):
"""Build query to filter configurations.
Filters are applied like AND condition.
"""
query = db().query(cls.model).order_by(cls.model.id.desc())
for key, value in six.iteritems(filters):
# TODO(asaprykin): There should be a better way to check
# presence of column in the model.
field = getattr(cls.model, key, None)
if field:
query = query.filter(field == value)
return query
@classmethod
def find_config(cls, **filters):
"""Returns a single configuration for specified filters.
Example:
OpenstackConfig.find_config(cluster_id=10, node_id=12)
"""
query = cls._find_configs_query(filters)
return query.first()
@classmethod
def find_configs(cls, **filters):
"""Returns list of configurations for specified filters.
Example:
OpenstackConfig.find_configs(cluster_id=10, node_id=12)
"""
query = cls._find_configs_query(filters)
return query.all()
@classmethod
def find_configs_for_nodes(cls, cluster, nodes):
"""Returns list of configurations that should be applied.
Returns list of configurations for specified nodes that will be
applied.
"""
all_configs = cls.find_configs(cluster_id=cluster.id, is_active=True)
node_ids = set(n.id for n in nodes)
node_roles = set()
for node in nodes:
node_roles.update(node.roles)
configs = []
for config in all_configs:
if config.config_type == consts.OPENSTACK_CONFIG_TYPES.cluster:
configs.append(config)
elif (config.config_type == consts.OPENSTACK_CONFIG_TYPES.node and
config.node_id in node_ids):
configs.append(config)
elif (config.config_type ==
consts.OPENSTACK_CONFIG_TYPES.role and
config.node_role in node_roles):
configs.append(config)
return configs
class OpenstackConfigCollection(NailgunCollection):
single = OpenstackConfig

View File

@ -0,0 +1,29 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.objects.serializers.base import BasicSerializer
class OpenstackConfigSerializer(BasicSerializer):
fields = (
'id',
'is_active',
'config_type',
'cluster_id',
'node_id',
'node_role',
'created_at',
'configuration'
)

View File

@ -15,6 +15,8 @@
# under the License.
import abc
from collections import defaultdict
import os
import six
import yaml
@ -145,7 +147,8 @@ class StandartConfigRolesHook(ExpressionBasedTask):
super(StandartConfigRolesHook, self).__init__(task, cluster)
def get_uids(self):
return get_uids_for_roles(self.nodes, self.task['role'])
roles = self.task.get('role', self.task.get('groups'))
return get_uids_for_roles(self.nodes, roles)
def serialize(self):
uids = self.get_uids()
@ -370,6 +373,52 @@ class UpdateHosts(GenericRolesHook):
yield templates.make_puppet_task(uids, self.task)
class UploadConfiguration(GenericRolesHook):
"""Hook that uploads yaml file with configuration on nodes."""
identity = 'upload_configuration'
def __init__(self, task, cluster, nodes, configs=None):
super(UploadConfiguration, self).__init__(task, cluster, nodes)
self.configs = configs
def serialize(self):
configs = self.configs
if configs is None:
configs = objects.OpenstackConfig.find_configs_for_nodes(
self.cluster, self.nodes)
node_configs = defaultdict(lambda: defaultdict(dict))
nodes_to_update = dict((node.id, node) for node in self.nodes)
for config in configs:
if config.config_type == consts.OPENSTACK_CONFIG_TYPES.cluster:
for node_id in nodes_to_update:
node_configs[node_id]['cluster'] = config.configuration
elif config.config_type == consts.OPENSTACK_CONFIG_TYPES.role:
for node in self.nodes:
if config.node_role in node.roles:
node_configs[node.id]['role'].update(
config.configuration)
elif config.config_type == consts.OPENSTACK_CONFIG_TYPES.node:
if config.node_id in nodes_to_update:
fqdn = objects.Node.get_node_fqdn(
nodes_to_update[config.node_id])
node_configs[config.node_id][fqdn] = config.configuration
for node_id in node_configs:
for config_dest in node_configs[node_id]:
path = os.path.join(consts.OVERRIDE_CONFIG_BASE_PATH,
config_dest + '.yaml')
data = {'configuration': node_configs[node_id][config_dest]}
node = nodes_to_update[node_id]
yield templates.make_upload_task(
[node.uid], path=path, data=yaml.safe_dump(data))
class TaskSerializers(object):
"""Class serves as fabric for different types of task serializers."""
@ -377,7 +426,7 @@ class TaskSerializers(object):
UploadNodesInfo, UpdateHosts, GenerateKeys,
GenerateHaproxyKeys, CopyHaproxyKeys,
GenerateCephKeys, CopyCephKeys, IronicUploadImages,
IronicCopyBootstrapKey]
IronicCopyBootstrapKey, UploadConfiguration]
deploy_serializers = [PuppetHook, CreateVMsOnCompute]
def __init__(self, stage_serializers=None, deploy_serializers=None):

View File

@ -407,6 +407,47 @@ class NailgunReceiver(object):
cls._update_action_log_entry(status, task.name, task_uuid, nodes)
@classmethod
def update_config_resp(cls, **kwargs):
"""Updates task and nodes states at the end of upload config task"""
logger.info(
"RPC method update_config_resp received: %s" %
jsonutils.dumps(kwargs))
task_uuid = kwargs['task_uuid']
message = kwargs.get('error')
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
q_nodes = objects.NodeCollection.filter_by_id_list(
None, task.cache['nodes'])
# lock nodes for updating
nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
if status in (consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error):
for node in nodes:
node.status = consts.NODE_STATUSES.ready
node.progress = 100
if status == consts.TASK_STATUSES.error:
message = (u"Failed to update configuration on nodes:"
u" {0}.").format(', '.join(node.name for node in nodes))
logger.error(message)
notifier.notify("error", message)
db().flush()
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
cls._update_action_log_entry(status, task.name, task_uuid, nodes)
@classmethod
def _update_action_log_entry(cls, task_status, task_name, task_uuid,
nodes_from_resp):

View File

@ -1290,3 +1290,43 @@ class UpdateDnsmasqTaskManager(TaskManager):
tasks.UpdateDnsmasqTask
)
return task
class OpenstackConfigTaskManager(TaskManager):
def execute(self, filters):
self.check_running_task(consts.TASK_NAMES.deployment)
task = Task(name=consts.TASK_NAMES.deployment,
cluster=self.cluster,
status=consts.TASK_STATUSES.pending)
db().add(task)
nodes_to_update = objects.Cluster.get_nodes_to_update_config(
self.cluster, filters.get('node_id'), filters.get('node_role'))
message = self._call_silently(
task, tasks.UpdateOpenstackConfigTask,
self.cluster, nodes_to_update, method_name='message')
# locking task
task = objects.Task.get_by_uid(
task.id,
fail_if_not_found=True,
lock_for_update=True
)
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_update)
task.cache = copy.copy(message)
task.cache['nodes'] = [n.id for n in nodes_to_update]
for node in nodes_to_update:
node.status = consts.NODE_STATUSES.deploying
node.progress = 0
db().commit()
rpc.cast('naily', message)
return task

View File

@ -1712,6 +1712,40 @@ class UpdateDnsmasqTask(object):
)
class UpdateOpenstackConfigTask(object):
@classmethod
def message(cls, task, cluster, nodes):
configs = objects.OpenstackConfig.find_configs_for_nodes(
cluster, nodes)
refresh_on = set()
for config in configs:
refresh_on.update(config.configuration)
refreshable_tasks = objects.Cluster.get_refreshable_tasks(
cluster, refresh_on)
upload_serializer = tasks_serializer.UploadConfiguration(
task, task.cluster, nodes, configs)
tasks_to_execute = list(upload_serializer.serialize())
if refreshable_tasks:
orchestrator_graph = deployment_graph.AstuteGraph(task.cluster)
orchestrator_graph.only_tasks(refreshable_tasks)
deployment_tasks = orchestrator_graph.stage_tasks_serialize(
orchestrator_graph.graph.topology, nodes)
tasks_to_execute.extend(deployment_tasks)
rpc_message = make_astute_message(
task, 'execute_tasks', 'update_config_resp', {
'tasks': tasks_to_execute,
})
return rpc_message
if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP:
rpc.cast = fake_cast
CheckRepositoryConnectionFromMasterNodeTask\

View File

@ -68,6 +68,7 @@ from nailgun.objects import ClusterPlugins
from nailgun.objects import MasterNodeSettings
from nailgun.objects import Node
from nailgun.objects import NodeGroup
from nailgun.objects import OpenstackConfig
from nailgun.objects import Plugin
from nailgun.objects import Release
@ -116,6 +117,7 @@ class EnvironmentManager(object):
self.clusters = []
self.nodes = []
self.plugins = []
self.openstack_configs = []
self.network_manager = NetworkManager
def create(self, **kwargs):
@ -192,6 +194,22 @@ class EnvironmentManager(object):
expect_errors=expect_errors
)
def create_openstack_config(self, api=False, **kwargs):
if api:
resp = self.app.post(
reverse('OpenstackConfigCollectionHandler'),
params=jsonutils.dumps(kwargs),
headers=self.default_headers
)
self.tester.assertEqual(resp.status_code, 201)
config = resp.json_body
self.openstack_configs.append(
self.db.query(OpenstackConfig).get(config['id']))
else:
config = OpenstackConfig.create(kwargs)
db().flush()
self.openstack_configs.append(config)
def update_role(self, release_id, role_name, data, expect_errors=False):
return self.app.put(
reverse(

View File

@ -0,0 +1,94 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from nailgun import consts
from nailgun.task.manager import OpenstackConfigTaskManager
from nailgun.test import base
from nailgun.test.base import fake_tasks
class TestOpenstackConfigTaskManager(base.BaseIntegrationTest):
def setUp(self):
super(TestOpenstackConfigTaskManager, self).setUp()
self.env.create(
cluster_kwargs={'net_provider': 'neutron'},
release_kwargs={'version': '1111-8.0'},
nodes_kwargs=[
{'roles': ['controller'], 'status': 'ready'},
{'roles': ['compute'], 'status': 'ready'},
{'roles': ['compute'], 'status': 'ready'},
{'roles': ['compute'], 'pending_addition': True},
]
)
self.release = self.env.releases[0]
self.cluster = self.env.clusters[0]
self.nodes = self.env.nodes
self.env.create_openstack_config(
cluster_id=self.cluster.id,
configuration={
'keystone_config': {'param_a': 'cluster'},
})
self.env.create_openstack_config(
cluster_id=self.cluster.id,
node_id=self.env.nodes[0].id,
configuration={
'keystone_config': {'param_a': 'node_1'},
'nova_config': {'param_a': 'node_1'},
})
self.env.create_openstack_config(
cluster_id=self.cluster.id,
node_role='compute',
configuration={
'keystone_config': {},
})
@fake_tasks(fake_rpc=False, mock_rpc=False)
@patch('nailgun.rpc.cast')
def test_configuration_execute(self, mocked_rpc):
task_manager = OpenstackConfigTaskManager(self.cluster.id)
task = task_manager.execute({'cluster_id': self.cluster.id})
self.assertEqual(task.status, consts.TASK_STATUSES.pending)
all_node_ids = [n.id for n in self.env.nodes[:3]]
self.assertItemsEqual(task.cache['nodes'], all_node_ids)
tasks = mocked_rpc.call_args[0][1]['args']['tasks']
# 3 tasks for all ready nodes with cluster config
# 1 task for node[0] with node specific config
# 2 tasks (1 per each compute node)
self.assertEqual(len(tasks), 6)
cluster_uids = []
role_uids = []
node_uids = []
for task in tasks:
self.assertEqual('upload_file', task['type'])
if '/cluster' in task['parameters']['path']:
cluster_uids.extend(task['uids'])
if '/role' in task['parameters']['path']:
role_uids.extend(task['uids'])
if '/node' in task['parameters']['path']:
node_uids.extend(task['uids'])
self.assertItemsEqual(cluster_uids, map(str, all_node_ids))
self.assertItemsEqual(role_uids,
[self.nodes[1].uid, self.nodes[2].uid])
self.assertItemsEqual([self.nodes[0].uid], node_uids)

View File

@ -1479,6 +1479,41 @@ class TestConsumer(BaseReciverTestCase):
self.assertEqual(node2.status, consts.NODE_STATUSES.error)
self.assertEqual(node2.error_type, consts.NODE_ERRORS.provision)
def test_update_config_resp(self):
self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{'api': False, 'roles': ['controller'],
'status': consts.NODE_STATUSES.deploying},
{'api': False, 'roles': ['compute'],
'status': consts.NODE_STATUSES.deploying},
])
nodes = self.env.nodes
task = Task(
uuid=str(uuid.uuid4()),
name=consts.TASK_NAMES.deployment,
cluster_id=self.env.clusters[0].id
)
task.cache = {'nodes': [nodes[0].uid, nodes[1].uid]}
self.db.add(task)
self.db.commit()
kwargs = {
'task_uuid': task.uuid,
'status': consts.TASK_STATUSES.ready,
'progress': 100
}
self.receiver.update_config_resp(**kwargs)
self.db.refresh(nodes[0])
self.db.refresh(nodes[1])
self.db.refresh(task)
self.assertEqual(nodes[0].status, consts.NODE_STATUSES.ready)
self.assertEqual(nodes[1].status, consts.NODE_STATUSES.ready)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
class TestResetEnvironment(BaseReciverTestCase):

View File

@ -220,6 +220,46 @@ class TestHooksSerializers(BaseTaskSerializationTest):
self.assertItemsEqual(serialized_uids, self.all_uids)
self.assertNotIn(discovered_node.uid, serialized_uids)
def test_upload_configuration(self):
task_config = {
'id': 'upload_configuration',
'type': 'upload_file',
'role': '*',
}
configs = [
mock.Mock(config_type=consts.OPENSTACK_CONFIG_TYPES.cluster,
configuration={'cluster': 'foo'}),
mock.Mock(config_type=consts.OPENSTACK_CONFIG_TYPES.role,
node_role='compute',
configuration={'compute': 'bar'}),
mock.Mock(config_type=consts.OPENSTACK_CONFIG_TYPES.role,
node_role='cinder',
configuration={'cinder': 'buzz'}),
mock.Mock(config_type=consts.OPENSTACK_CONFIG_TYPES.node,
node_id=self.env.nodes[0].id,
configuration={'node_0': 'quux'})
]
task = tasks_serializer.UploadConfiguration(
task_config, self.cluster, self.nodes, configs)
serialized_tasks = list(task.serialize())
self.assertEqual(len(serialized_tasks), 5)
cluster_uids = []
role_uids = []
node_uids = []
for task in serialized_tasks:
self.assertEqual('upload_file', task['type'])
if '/cluster' in task['parameters']['path']:
cluster_uids.extend(task['uids'])
if '/role' in task['parameters']['path']:
role_uids.extend(task['uids'])
if '/node' in task['parameters']['path']:
node_uids.extend(task['uids'])
self.assertItemsEqual(self.all_uids, cluster_uids)
self.assertItemsEqual([self.nodes[2].uid], role_uids)
self.assertItemsEqual([self.nodes[0].uid], node_uids)
def test_update_hosts(self):
# mark one node as ready so we can test for duplicates
self.env.nodes[0].status = consts.NODE_STATUSES.ready