Merge "Add code to configure cluster for external hdfs"

This commit is contained in:
Jenkins 2014-01-22 05:33:47 +00:00 committed by Gerrit Code Review
commit 8db45d01d1
7 changed files with 148 additions and 18 deletions

View File

@ -13,6 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves.urllib import parse as urlparse
from savanna import conductor as c
from savanna import context
from savanna.plugins.general import utils as u
from savanna.utils import general as g
conductor = c.API
def put_file_to_hdfs(r, file, file_name, path, hdfs_user):
r.write_file_to('/tmp/%s' % file_name, file)
@ -33,3 +42,34 @@ def move_from_local(r, source, target, hdfs_user):
def create_dir(r, dir_name, hdfs_user):
r.execute_command(
'sudo su - -c "hadoop dfs -mkdir %s" %s' % (dir_name, hdfs_user))
def _get_cluster_hosts_information(host, cluster):
for c in conductor.cluster_get_all(context.ctx()):
if c.id == cluster.id:
continue
for i in u.get_instances(c):
if i.instance_name == host:
return g.generate_etc_hosts(c)
return None
def configure_cluster_for_hdfs(cluster, data_source):
host = urlparse.urlparse(data_source.url).hostname
etc_hosts_information = _get_cluster_hosts_information(host, cluster)
if etc_hosts_information is None:
# Ip address hasn't been resolved, the last chance is for VM itself
return
create_etc_host = 'sudo "cat /tmp/etc-hosts-update '
create_etc_host += '/etc/hosts > /tmp/etc-hosts"'
copy_etc_host = 'sudo "cat /tmp/etc-hosts > /etc/hosts"'
for inst in u.get_instances(cluster):
with inst.remote as r:
r.write_file_to('/tmp/etc-hosts-update', etc_hosts_information)
r.execute_command(create_etc_host)
r.execute_command(copy_etc_host)

View File

@ -116,6 +116,10 @@ def run_job(job_execution):
#TODO(nprivalova): should be removed after all features implemented
validate(input_source, output_source, job)
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(cluster, data_source)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
hdfs_user = plugin.get_hdfs_user()
wf_dir = create_workflow_dir(u.get_jobtracker(cluster), job, hdfs_user)
@ -209,8 +213,5 @@ def _append_slash_if_needed(path):
#TODO(nprivalova): this validation should be removed after implementing
# all features
def validate(input_data, output_data, job):
if (input_data and input_data.type != 'swift') or\
(output_data and output_data.type != 'swift'):
raise RuntimeError
if job.type not in ['Pig', 'MapReduce', 'Hive', 'Java', 'Jar']:
raise RuntimeError

View File

@ -97,16 +97,6 @@ class Engine:
if not g.check_cluster_exists(instance.node_group.cluster):
return
def _generate_etc_hosts(self, cluster):
hosts = "127.0.0.1 localhost\n"
for node_group in cluster.node_groups:
for instance in node_group.instances:
hosts += "%s %s %s\n" % (instance.internal_ip,
instance.fqdn(),
instance.hostname())
return hosts
def _configure_instances(self, cluster):
"""Configure active instances.
@ -114,7 +104,7 @@ class Engine:
* setup passwordless login
* etc.
"""
hosts_file = self._generate_etc_hosts(cluster)
hosts_file = g.generate_etc_hosts(cluster)
with context.ThreadGroup() as tg:
for node_group in cluster.node_groups:

View File

@ -21,7 +21,7 @@ import savanna.service.edp.api as api
data_source_type = {
"type": "string",
"enum": ["swift"]
"enum": ["swift", "hdfs"]
}
job_configs = {

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import urlparse
import savanna.exceptions as ex
import savanna.service.validations.edp.base as b
@ -50,6 +52,24 @@ def check_data_source_create(data, **kwargs):
b.check_data_source_unique_name(data['name'])
if "swift" == data["type"]:
if not ("user" in data["credentials"]
and "password" in data["credentials"]):
raise ex.InvalidCredentials("Invalid credentials for Swift")
_check_swift_data_source_create(data)
if "hdfs" == data["type"]:
_check_hdfs_data_source_create(data)
def _check_swift_data_source_create(data):
if not ("user" in data["credentials"]
and "password" in data["credentials"]):
raise ex.InvalidCredentials("Invalid credentials for Swift")
def _check_hdfs_data_source_create(data):
if len(data['url']) == 0:
raise ex.InvalidException("HDFS url must not be empty")
url = urlparse.urlparse(data['url'])
if url.scheme != "hdfs":
raise ex.InvalidException("URL scheme must be 'hdfs'")
if not url.hostname:
raise ex.InvalidException("HDFS url is incorrect, "
"cannot determine a hostname")

View File

@ -0,0 +1,68 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import savanna.exceptions as ex
from savanna.service import api
from savanna.service.validations.edp import data_source as ds
from savanna.tests.unit.service.validation import utils as u
class TestDataSourceValidation(u.ValidationTestCase):
def setUp(self):
self._create_object_fun = ds.check_data_source_create
self.scheme = ds.DATA_SOURCE_SCHEMA
api.plugin_base.setup_plugins()
def test_swift_creation(self):
data = {
"name": "test_data_data_source",
"url": "swift://1234",
"type": "swift",
"credentials": {
"user": "user",
"password": "password"
},
"description": "long description"
}
self._assert_types(data)
@mock.patch("savanna.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_wrong_schema(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "hdf://test_cluster/",
"type": "hdfs",
"description": "incorrect url schema"
}
with self.assertRaises(ex.InvalidException):
ds.check_data_source_create(data)
@mock.patch("savanna.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_correct_url(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "hdfs://test_cluster/",
"type": "hdfs",
"description": "correct url schema"
}
ds.check_data_source_create(data)

View File

@ -80,3 +80,14 @@ def clean_cluster_from_empty_ng(cluster):
for ng in cluster.node_groups:
if ng.count == 0:
conductor.node_group_remove(ctx, ng)
def generate_etc_hosts(cluster):
hosts = "127.0.0.1 localhost\n"
for node_group in cluster.node_groups:
for instance in node_group.instances:
hosts += "%s %s %s\n" % (instance.internal_ip,
instance.fqdn(),
instance.hostname())
return hosts