Merge "Floating ip assignement support"

This commit is contained in:
Jenkins 2013-09-24 04:05:46 +00:00 committed by Gerrit Code Review
commit a077dc4a21
19 changed files with 314 additions and 125 deletions

View File

@ -51,6 +51,41 @@ User can set how many volumes will be attached to each node in a Node Group and
All volumes are attached during Cluster creation/scaling operations.
Neutron and Nova Network support
--------------------------------
OpenStack Cluster may use Nova Network or Neutron as a networking service. Savanna supports both, but when deployed,
a special configuration for networking should be set explicitly. By default Savanna will behave as if Nova Network is used.
If OpenStack Cluster uses Neutron, then ``use_neutron`` option should be set to ``True`` in Savanna configuration file.
.. sourcecode:: config
use_neutron=True
Savanna Dashboard should also be configured properly to support Neutron. ``SAVANNA_USE_NEUTRON`` should be set to ``True`` in
OpenStack Dashboard ``local_settings.py`` configuration file.
.. sourcecode:: python
SAVANNA_USE_NEUTRON=True
Floating IP Management
----------------------
Savanna needs to access instances through ssh during a Cluster setup. To establish a connection Savanna may
use both: fixed and floating IP of an Instance. By default ``use_floating_ips`` parameter is set to ``True``, so
Savanna will use Floating IP of an Instance to connect. In this case, user has two options for how to make all instances
get a floating IP:
* Nova Network may be configured to assign floating IPs automatically by setting ``auto_assign_floating_ip`` to ``True`` in ``nova.conf``
* User may specify a floating IP pool for each Node Group directly.
Note: When using floating IPs for management (``use_floating_ip=True``) **every** instance in the Cluster should have a floating IP,
otherwise Savanna will not be able to work with it.
If ``use_floating_ips`` parameter is set to ``False`` Savanna will use Instances' fixed IPs for management. In this case
the node where Savanna is running should have access to Instances' fixed IP network. When OpenStack uses Neutron for
networking, user will be able to choose fixed IP network for all instances in a Cluster.
Anti-affinity
-------------

View File

@ -10,13 +10,13 @@
#os_admin_password=nova
#os_admin_tenant_name=admin
# When set to false, Savanna uses only internal IP of VMs.
# When set to true, Savanna expects OpenStack to auto-
# assign floating IPs to cluster nodes. Internal IPs will
# be used for inter-cluster communication, while floating
# ones will be used by Savanna to configure nodes. Also
# floating IPs will be exposed in service URLs. This option
# is ignored when "use_neutron" is set to True (boolean value)
# If set to True, savanna will use floating IPs to
# communicate with instances. To make sure that all
# instances have floating assigned in Nova Network
# set "auto_assign_floating_ip=True" in nova.conf.
# If Neutron is used for networking, make sure that
# all Node Groups have "floating_ip_pool" parameter
# defined.
#use_floating_ips=True
# The suffix of the node's FQDN. In nova-network that is

View File

@ -11,12 +11,13 @@
#port=8386
# When set to false, Savanna uses only internal IP of VMs.
# When set to true, Savanna expects OpenStack to auto-assign
# floating IPs to cluster nodes. Internal IPs will be used for
# inter-cluster communication, while floating ones will be
# used by Savanna to configure nodes. Also floating IPs will
# be exposed in service URLs. (boolean value)
# If set to True, savanna will use floating IPs to
# communicate with instances. To make sure that all
# instances have floating assigned in Nova Network
# set "auto_assign_floating_ip=True" in nova.conf.
# If Neutron is used for networking, make sure that
# all Node Groups have "floating_ip_pool" parameter
# defined.
#use_floating_ips=true
# The suffix of the node's FQDN. In nova-network that is

View File

@ -5,7 +5,6 @@ Flask>=0.10,<1.0
iso8601>=0.1.4
jsonschema>=1.3.0,!=1.4.0
kombu>=2.4.8
netaddr
paramiko>=1.8.0
pbr>=0.5.21,<1.0
# NOTE(slukjanov): <1.2.3 added to support old versions of python clients that

View File

@ -36,6 +36,7 @@ NODE_GROUP_DEFAULTS = {
"volumes_per_node": 0,
"volumes_size": 0,
"volume_mount_prefix": "/volumes/disk",
"floating_ip_pool": None
}
INSTANCE_DEFAULTS = {

View File

@ -70,6 +70,8 @@ class NodeGroup(object):
volumes_per_node
volumes_size
volume_mount_prefix
floating_ip_pool - Floating IP Pool name used to assign Floating IPs to
instances in this Node Group
count
instances - list of Instance objects
node_group_template_id
@ -166,6 +168,7 @@ class NodeGroupTemplate(object):
volumes_per_node
volumes_size
volume_mount_prefix
floating_ip_pool
"""

View File

@ -28,16 +28,16 @@ cli_opts = [
'headers and bodies')
]
cluster_node_opts = [
networking_opts = [
cfg.BoolOpt('use_floating_ips',
default=True,
help='When set to false, Savanna uses only internal IP of VMs.'
' When set to true, Savanna expects OpenStack to auto-'
'assign floating IPs to cluster nodes. Internal IPs will '
'be used for inter-cluster communication, while floating '
'ones will be used by Savanna to configure nodes. Also '
'floating IPs will be exposed in service URLs. This '
'option is ignored when "use_neutron" is set to True'),
help='If set to True, Savanna will use floating IPs to '
'communicate with instances. To make sure that all '
'instances have floating IPs assigned in Nova Network '
'set "auto_assign_floating_ip=True" in nova.conf.'
'If Neutron is used for networking, make sure that'
'all Node Groups have "floating_ip_pool" parameter'
'defined.'),
cfg.StrOpt('node_domain',
default='novalocal',
help="The suffix of the node's FQDN. In nova-network that is "
@ -64,7 +64,7 @@ cfg.set_defaults(log.log_opts, default_log_levels=[
CONF = cfg.CONF
CONF.register_cli_opts(cli_opts)
CONF.register_opts(cluster_node_opts)
CONF.register_opts(networking_opts)
ARGV = []

View File

@ -102,6 +102,7 @@ class NodeGroup(mb.SavannaBase):
'node_group_templates.id'))
node_group_template = relationship('NodeGroupTemplate',
backref="node_groups", lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
def to_dict(self):
d = super(NodeGroup, self).to_dict()
@ -181,6 +182,7 @@ class NodeGroupTemplate(mb.SavannaBase):
volumes_per_node = sa.Column(sa.Integer, nullable=False)
volumes_size = sa.Column(sa.Integer)
volume_mount_prefix = sa.Column(sa.String(80))
floating_ip_pool = sa.Column(sa.String(36))
class TemplatesRelation(mb.SavannaBase):
@ -209,6 +211,7 @@ class TemplatesRelation(mb.SavannaBase):
node_group_template = relationship('NodeGroupTemplate',
backref="templates_relations",
lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
## EDP objects: DataSource, JobOrigin, Job, Job Execution, JobBinary

View File

@ -142,3 +142,10 @@ class DeletionFailed(SavannaException):
def __init__(self, message=None):
if message:
self.message = message
class MissingFloatingNetworkException(SavannaException):
def __init__(self, ng_name):
self.message = ("Node Group %s is missing 'floating_ip_pool' "
"field" % ng_name)
self.code = "MISSING_FLOATING_NETWORK"

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
import urllib
from savanna import conductor as c
@ -28,6 +29,7 @@ from savanna.utils.openstack import nova
conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -89,12 +91,13 @@ def create_cluster(values):
cluster = conductor.cluster_update(ctx, cluster,
{"status": "Validating"})
LOG.info(g.format_cluster_status(cluster))
plugin.validate(cluster)
except Exception as ex:
except Exception as e:
with excutils.save_and_reraise_exception():
cluster = conductor.cluster_update(ctx, cluster,
{"status": "Error",
"status_description": str(ex)})
"status_description": str(e)})
LOG.info(g.format_cluster_status(cluster))
context.spawn("cluster-creating-%s" % cluster.id,

View File

@ -17,6 +17,7 @@ import datetime
from novaclient import exceptions as nova_exceptions
from oslo.config import cfg
import six
from savanna import conductor as c
from savanna import context
@ -41,10 +42,19 @@ def create_cluster(cluster):
LOG.info(g.format_cluster_status(cluster))
_create_instances(cluster)
# wait for all instances are up and accessible
# wait for all instances are up and networks ready
cluster = conductor.cluster_update(ctx, cluster, {"status": "Waiting"})
LOG.info(g.format_cluster_status(cluster))
cluster = _await_instances(cluster)
instances = get_instances(cluster)
_await_active(instances)
_assign_floating_ips(instances)
_await_networks(instances)
cluster = conductor.cluster_get(ctx, cluster)
# attach volumes
volumes.attach(cluster)
@ -65,38 +75,49 @@ def create_cluster(cluster):
_rollback_cluster_creation(cluster, ex)
def get_instances(cluster, instances_ids):
def get_instances(cluster, instances_ids=None):
inst_map = {}
for node_group in cluster.node_groups:
for instance in node_group.instances:
inst_map[instance.id] = instance
return [inst_map[id] for id in instances_ids]
if instances_ids is not None:
return [inst_map[id] for id in instances_ids]
else:
return [v for v in six.itervalues(inst_map)]
def scale_cluster(cluster, node_group_id_map, plugin):
ctx = context.ctx()
instances_list = []
instance_ids = []
try:
instances_list = _scale_cluster_instances(
instance_ids = _scale_cluster_instances(
cluster, node_group_id_map, plugin)
cluster = conductor.cluster_get(ctx, cluster)
cluster = clean_cluster_from_empty_ng(cluster)
cluster = _await_instances(cluster)
instances = get_instances(cluster, instance_ids)
volumes.attach_to_instances(get_instances(cluster, instances_list))
_await_active(instances)
_assign_floating_ips(instances)
_await_networks(instances)
cluster = conductor.cluster_get(context, cluster)
volumes.attach_to_instances(get_instances(cluster, instance_ids))
except Exception as ex:
LOG.warn("Can't scale cluster '%s' (reason: %s)", cluster.name, ex)
with excutils.save_and_reraise_exception():
cluster = conductor.cluster_get(ctx, cluster)
_rollback_cluster_scaling(cluster,
get_instances(cluster, instances_list),
get_instances(cluster, instance_ids),
ex)
instances_list = []
instance_ids = []
cluster = conductor.cluster_get(ctx, cluster)
clean_cluster_from_empty_ng(cluster)
@ -111,9 +132,9 @@ def scale_cluster(cluster, node_group_id_map, plugin):
# we should be here with valid cluster: if instances creation
# was not successful all extra-instances will be removed above
if instances_list:
if instance_ids:
_configure_instances(cluster)
return instances_list
return instance_ids
def _generate_anti_affinity_groups(cluster):
@ -136,7 +157,6 @@ def _create_instances(cluster):
#aa_groups = _generate_anti_affinity_groups(cluster)
aa_groups = {}
for node_group in cluster.node_groups:
count = node_group.count
conductor.node_group_update(ctx, node_group, {'count': 0})
@ -258,66 +278,90 @@ echo "%(private_key)s" > %(user_home)s/.ssh/id_rsa
}
def _await_instances(cluster):
"""Await all instances are in Active status and available."""
def _assign_floating_ips(instances):
for instance in instances:
node_group = instance.node_group
if node_group.floating_ip_pool:
networks.assign_floating_ip(instance.instance_id,
node_group.floating_ip_pool)
def _check_cluster_exists(cluster):
ctx = context.ctx()
all_up = False
is_accesible = set()
while not all_up:
all_up = True
# check if cluster still exists (it might have been removed)
cluster = conductor.cluster_get(ctx, cluster)
return cluster is not None
for node_group in cluster.node_groups:
for instance in node_group.instances:
if not _check_if_up(instance):
all_up = False
cluster = conductor.cluster_get(ctx, cluster)
def _await_networks(instances):
if not instances:
return
for node_group in cluster.node_groups:
for instance in node_group.instances:
if not _check_if_accessible(instance, is_accesible):
all_up = False
ips_assigned = set()
while len(ips_assigned) != len(instances):
if not _check_cluster_exists(instances[0].node_group.cluster):
return
for instance in instances:
if instance.id not in ips_assigned:
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
context.sleep(1)
return cluster
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, instances[0].node_group.cluster)
instances = get_instances(cluster, ips_assigned)
accessible_instances = set()
while len(accessible_instances) != len(instances):
if not _check_cluster_exists(instances[0].node_group.cluster):
return
for instance in instances:
if instance.id not in accessible_instances:
if _check_if_accessible(instance):
accessible_instances.add(instance.id)
context.sleep(1)
def _check_if_up(instance):
if instance.internal_ip and instance.management_ip:
return True
def _await_active(instances):
"""Await all instances are in Active status and available."""
if not instances:
return
active_ids = set()
while len(active_ids) != len(instances):
if not _check_cluster_exists(instances[0].node_group.cluster):
return
for instance in instances:
if instance.id not in active_ids:
if _check_if_active(instance):
active_ids.add(instance.id)
context.sleep(1)
def _check_if_active(instance):
server = nova.get_instance_info(instance)
if server.status == 'ERROR':
# TODO(slukjanov): replace with specific error
raise RuntimeError("node %s has error status" % server.name)
if server.status != 'ACTIVE':
return False
if len(server.networks) == 0:
return False
if not networks.init_instances_ips(instance, server):
return False
return True
return server.status == 'ACTIVE'
def _check_if_accessible(instance, cache):
if instance.id in cache:
return True
def _check_if_accessible(instance):
if not instance.internal_ip or not instance.management_ip:
# instance is not up yet
if not instance.management_ip:
return False
try:
# check if ssh is accessible and cloud-init
# script is finished generating id_rsa
exit_code, _ = instance.remote.execute_command(
exit_code, stdout = instance.remote.execute_command(
"ls .ssh/id_rsa", raise_when_error=False)
# don't log ls command failure
if exit_code:
return False
except Exception as ex:
@ -326,7 +370,6 @@ def _check_if_accessible(instance, cache):
return False
LOG.debug('Instance %s is accessible' % instance.instance_name)
cache.add(instance.id)
return True
@ -394,6 +437,8 @@ def _shutdown_instances(cluster):
def _shutdown_instance(instance):
ctx = context.ctx()
try:
if instance.node_group.floating_ip_pool:
networks.delete_floating_ip(instance.instance_id)
nova.client().servers.delete(instance.instance_id)
except nova_exceptions.NotFound:
#Just ignore non-existing instances

View File

@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import netaddr
from oslo.config import cfg
import six
from savanna import conductor as c
from savanna import context
@ -26,57 +25,45 @@ conductor = c.API
CONF = cfg.CONF
# NOTE(slukjanov): https://blueprints.launchpad.net/savanna?searchtext=ip
def init_instances_ips(instance, server):
if instance.internal_ip and instance.management_ip:
return True
if CONF.use_neutron:
return init_neutron_ips(instance, server)
else:
return init_nova_network_ips(instance, server)
def init_neutron_ips(instance, server):
ctx = context.ctx()
net_id = instance.node_group.cluster.neutron_management_network
net_name = nova.client().networks.get(net_id).label
internal_ip = server.networks.get(net_name, [None])[0]
management_ip = internal_ip
conductor.instance_update(ctx, instance, {"management_ip": management_ip,
"internal_ip": internal_ip})
return internal_ip
def init_nova_network_ips(instance, server):
def init_instances_ips(instance):
"""Extracts internal and management ips.
As internal ip will be used the first ip from the nova networks CIDRs.
If use_floating_ip flag is set than management ip will be the first
non-internal ip.
"""
ctx = context.ctx()
management_ip = instance.management_ip
internal_ip = instance.internal_ip
server = nova.get_instance_info(instance)
for network_label in server.networks:
nova_network = nova.client().networks.find(label=network_label)
network = netaddr.IPNetwork(nova_network.cidr)
for ip in server.networks[network_label]:
if netaddr.IPAddress(ip) in network:
internal_ip = instance.internal_ip or ip
management_ip = None
internal_ip = None
for network_label, addresses in six.iteritems(server.addresses):
for address in addresses:
if address['OS-EXT-IPS:type'] == 'fixed':
internal_ip = internal_ip or address['addr']
else:
management_ip = instance.management_ip or ip
management_ip = management_ip or address['addr']
if not CONF.use_floating_ips:
management_ip = internal_ip
conductor.instance_update(ctx, instance, {"management_ip": management_ip,
"internal_ip": internal_ip})
conductor.instance_update(context.ctx(), instance,
{"management_ip": management_ip,
"internal_ip": internal_ip})
return internal_ip and management_ip
def assign_floating_ip(instance_id, pool):
ip = nova.client().floating_ips.create(pool)
nova.client().servers.get(instance_id).add_floating_ip(ip)
def delete_floating_ip(instance_id):
fl_ips = nova.client().floating_ips.findall(instance_id=instance_id)
for fl_ip in fl_ips:
if CONF.use_neutron:
nova.client().floating_ips.delete(fl_ip.id)
else:
nova.client().floating_ips.delete(fl_ip.ip)

View File

@ -13,7 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
import novaclient.exceptions as nova_ex
from savanna import conductor as cond
from savanna import context
import savanna.exceptions as ex
import savanna.plugins.base as plugin_base
@ -22,6 +26,10 @@ import savanna.utils.openstack.keystone as keystone
import savanna.utils.openstack.nova as nova
CONF = cfg.CONF
conductor = cond.API
def _get_plugin_configs(plugin_name, hadoop_version, scope=None):
pl_confs = {}
for config in plugin_base.PLUGINS.get_plugin(
@ -182,6 +190,7 @@ def check_node_groups_in_cluster_templates(plugin_name, hadoop_version,
cluster_template_id):
c_t = api.get_cluster_template(id=cluster_template_id)
n_groups = c_t.to_wrapped_dict()['cluster_template']['node_groups']
check_network_config(n_groups)
for node_group in n_groups:
check_node_group_basic_fields(plugin_name, hadoop_version, node_group)
@ -200,6 +209,28 @@ def check_node_group_template_exists(ng_tmpl_id):
" doesn't exist" % ng_tmpl_id)
def check_network_config(node_groups):
if CONF.use_floating_ips and CONF.use_neutron:
for ng in node_groups:
if not _get_floating_ip_pool(ng):
raise ex.MissingFloatingNetworkException(ng.get('name'))
def _get_floating_ip_pool(node_group):
if node_group.get('floating_ip_pool'):
return node_group['floating_ip_pool']
if node_group.get('node_group_template_id'):
ctx = context.ctx()
ngt = conductor.node_group_template_get(
ctx,
node_group['node_group_template_id'])
if ngt.get('floating_ip_pool'):
return ngt['floating_ip_pool']
return None
## Cluster scaling
def check_resize(cluster, r_node_groups):

View File

@ -74,6 +74,9 @@ def check_cluster_create(data, **kwargs):
b.check_node_processes(data['plugin_name'], data['hadoop_version'],
data['anti_affinity'])
if data.get('node_groups'):
b.check_network_config(data['node_groups'])
def _get_cluster_field(cluster, field):
if cluster.get(field):

View File

@ -85,3 +85,5 @@ def check_cluster_scaling(data, cluster_id, **kwargs):
if data.get("add_node_groups"):
b.check_add_node_groups(cluster, data['add_node_groups'])
b.check_network_config(data['add_node_groups'])

View File

@ -64,6 +64,9 @@ NODE_GROUP_TEMPLATE_SCHEMA = {
"description": {
"type": "string",
},
"floating_ip_pool": {
"type": "string",
},
},
"additionalProperties": False,
"required": [

View File

@ -108,6 +108,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
ng.pop("volume_mount_prefix")
ng.pop("volumes_size")
ng.pop("volumes_per_node")
ng.pop("floating_ip_pool")
self.assertListEqual(SAMPLE_CLUSTER["node_groups"],
cl_db_obj["node_groups"])

View File

@ -26,6 +26,7 @@ SAMPLE_NGT = {
"hadoop_version": "test_version",
"name": "ngt_test",
"node_processes": ["p1", "p2"],
"floating_ip_pool": None,
"node_configs": {
"service_1": {
"config_1": "value_1"
@ -54,13 +55,15 @@ SAMPLE_CLT = {
"name": "ng_1",
"flavor_id": "42",
"node_processes": ["p1", "p2"],
"count": 1
"count": 1,
"floating_ip_pool": None
},
{
"name": "ng_2",
"flavor_id": "42",
"node_processes": ["p3", "p4"],
"count": 3
"count": 3,
"floating_ip_pool": None,
}
]

View File

@ -158,9 +158,48 @@ class NodePlacementTest(models_test_base.DbTestCase):
self.assertEqual(inst_number, 3)
def _make_ng_dict(name, flavor, processes, count):
return {'name': name, 'flavor_id': flavor, 'node_processes': processes,
'count': count}
class IpManagementTest(models_test_base.DbTestCase):
def setUp(self):
r.Resource._is_passthrough_type = _resource_passthrough
super(IpManagementTest, self).setUp()
@mock.patch('savanna.utils.openstack.nova.client')
@mock.patch('oslo.config.cfg')
def test_ip_assignment_use_no_floating(self, cfg, novaclient):
cfg.CONF.use_floating_ips = False
nova = _create_nova_mock(novaclient)
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
["data node", "test tracker"], 2, 'pool'),
_make_ng_dict("test_group_2", "test_flavor",
["name node", "test tracker"], 1)]
ctx = context.ctx()
cluster = _create_cluster_mock(node_groups, ["data node"])
instances._create_instances(cluster)
cluster = conductor.cluster_get(ctx, cluster)
instances_list = instances.get_instances(cluster)
instances._assign_floating_ips(instances_list)
nova.floating_ips.create.assert_has_calls(
[mock.call("pool"),
mock.call("pool")],
any_order=False
)
self.assertEqual(nova.floating_ips.create.call_count, 2,
"Not expected floating IPs number found.")
def _make_ng_dict(name, flavor, processes, count, floating_ip_pool=None):
ng_dict = {'name': name, 'flavor_id': flavor, 'node_processes': processes,
'count': count}
if floating_ip_pool:
ng_dict.update({"floating_ip_pool": floating_ip_pool})
return ng_dict
def _create_cluster_mock(node_groups, aa):
@ -185,15 +224,36 @@ def _create_cluster_mock(node_groups, aa):
def _mock_instance(id):
instance1 = mock.Mock()
instance1.id = id
return instance1
server = mock.Mock()
server.id = id
server.instance_id = id
server.status = 'ACTIVE'
server.networks = ["n1", "n2"]
server.addresses = {'n1': [{'OS-EXT-IPS:type': 'fixed',
'addr': "{0}.{0}.{0}.{0}" .format(id)}],
'n2': [{'OS-EXT-IPS:type': 'floating',
'addr': "{0}.{0}.{0}.{0}" .format(id)}]}
server.add_floating_ip.side_effect = [True, True, True]
return server
def _mock_ip(id):
ip = mock.Mock()
ip.id = id
ip.ip = "{0}.{0}.{0}.{0}" .format(id)
return ip
def _mock_instances(count):
return [_mock_instance(str(i)) for i in range(1, count + 1)]
def _mock_ips(count):
return [_mock_ip(str(i)) for i in range(1, count + 1)]
def _generate_user_data_script(cluster):
script_template = """#!/bin/bash
echo "%(public_key)s" >> %(user_home)s/.ssh/authorized_keys
@ -209,7 +269,9 @@ echo "%(private_key)s" > %(user_home)s/.ssh/id_rsa
def _create_nova_mock(novalcient):
nova = mock.Mock()
novalcient.return_value = nova
nova.servers.create.side_effect = _mock_instances(3)
nova.servers.create.side_effect = _mock_instances(4)
nova.servers.get.return_value = _mock_instance(1)
nova.floating_ips.create.side_effect = _mock_ips(4)
images = mock.Mock()
images.username = "root"
nova.images.get = lambda x: images