Fix Spark EDP job failed in vanilla 2.8.2

Vanilla should specify corresponded hadoop-openstack package
according to plugin version in spark configuration.

In Vanilla 2.8.2, there is an error that hadoop-openstack
version was specified to 2.7.1. This causes spark job failed
in vanilla 2.8.2 because of "No such file".

Change-Id: I5b54d69def7b457715ed60da3663a0153fe94be8
This commit is contained in:
Shu Yingya 2018-02-27 19:15:38 +08:00
parent a7a22afbcc
commit 779cade385
6 changed files with 66 additions and 55 deletions

View File

@ -85,20 +85,9 @@ PRIORITY_1_CONFS = [
'yarn.scheduler.minimum-allocation-vcores'
]
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar'])
SPARK_CONFS = {
'Spark': {
"OPTIONS": [
{
'name': 'Executor extra classpath',
'description': 'Value for spark.executor.extraClassPath'
' in spark-defaults.conf'
' (default: %s)' % _default_executor_classpath,
'default': '%s' % _default_executor_classpath,
'priority': 2,
},
{
'name': 'Spark home',
'description': 'The location of the spark installation'
@ -278,13 +267,6 @@ def is_data_locality_enabled(pctx, cluster):
ENABLE_DATA_LOCALITY.name, cluster)
def _get_spark_opt_default(opt_name):
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
if opt_name == opt["name"]:
return opt["default"]
return None
def generate_spark_env_configs(cluster):
configs = []

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_config import cfg
import six
@ -41,6 +43,22 @@ OOZIE_DEFAULT = x.load_hadoop_xml_defaults(
HIVE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_1/resources/hive-default.xml')
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar'])
SPARK_CONFS = copy.deepcopy(c_helper.SPARK_CONFS)
SPARK_CONFS['Spark']['OPTIONS'].append(
{
'name': 'Executor extra classpath',
'description': 'Value for spark.executor.extraClassPath'
' in spark-defaults.conf'
' (default: %s)' % _default_executor_classpath,
'default': '%s' % _default_executor_classpath,
'priority': 2,
}
)
XML_CONFS = {
"Hadoop": [CORE_DEFAULT],
"HDFS": [HDFS_DEFAULT],
@ -83,9 +101,16 @@ def _init_all_configs():
return configs
def _get_spark_opt_default(opt_name):
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
if opt_name == opt["name"]:
return opt["default"]
return None
def _get_spark_configs():
spark_configs = []
for service, config_items in six.iteritems(c_helper.SPARK_CONFS):
for service, config_items in six.iteritems(SPARK_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_config import cfg
import six
@ -41,6 +43,22 @@ OOZIE_DEFAULT = x.load_hadoop_xml_defaults(
HIVE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_8_2/resources/hive-default.xml')
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.8.2.jar'])
SPARK_CONFS = copy.deepcopy(c_helper.SPARK_CONFS)
SPARK_CONFS['Spark']['OPTIONS'].append(
{
'name': 'Executor extra classpath',
'description': 'Value for spark.executor.extraClassPath'
' in spark-defaults.conf'
' (default: %s)' % _default_executor_classpath,
'default': '%s' % _default_executor_classpath,
'priority': 2,
}
)
XML_CONFS = {
"Hadoop": [CORE_DEFAULT],
"HDFS": [HDFS_DEFAULT],
@ -83,9 +101,16 @@ def _init_all_configs():
return configs
def _get_spark_opt_default(opt_name):
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
if opt_name == opt["name"]:
return opt["default"]
return None
def _get_spark_configs():
spark_configs = []
for service, config_items in six.iteritems(c_helper.SPARK_CONFS):
for service, config_items in six.iteritems(SPARK_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],

View File

@ -123,19 +123,6 @@ class TestConfigHelper(base.SaharaTestCase):
get_config_value.assert_called_once_with(self.pctx, target,
name, self.cluster)
def test_get_spark_opt_default(self):
c_helper.SPARK_CONFS = {'Spark': {
'OPTIONS': [{'name': 'test_name',
'default': 'test'}]}
}
opt_name = 'tt'
default = c_helper._get_spark_opt_default(opt_name)
self.assertIsNone(default)
opt_name = 'test_name'
default = c_helper._get_spark_opt_default(opt_name)
self.assertEqual(default, 'test')
def test_generate_spark_env_configs(self):
configs = 'HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop\n' \
'YARN_CONF_DIR=/opt/hadoop/etc/hadoop'

View File

@ -16,7 +16,6 @@
import mock
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla.hadoop2 import config_helper as h_helper
from sahara.plugins.vanilla.v2_7_1 import config_helper as v_helper
from sahara.tests.unit import base
@ -49,17 +48,14 @@ class TestConfigHelper(base.SaharaTestCase):
init_configs = v_helper._init_all_configs()
self.assertEqual(init_configs, configs)
def test_get_spark_opt_default(self):
opt_name = 'Executor extra classpath'
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar'])
default = v_helper._get_spark_opt_default(opt_name)
self.assertEqual(default, _default_executor_classpath)
def test_get_spark_configs(self):
h_helper.SPARK_CONFS = {
'Spark': {
'OPTIONS': [{
'name': 'test',
'description': 'This is a test',
'default': 'default',
'priority': 1
}]
}
}
spark_configs = v_helper._get_spark_configs()
for i in spark_configs:
self.assertIsInstance(i, p.Config)

View File

@ -16,7 +16,6 @@
import mock
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla.hadoop2 import config_helper as h_helper
from sahara.plugins.vanilla.v2_8_2 import config_helper as v_helper
from sahara.tests.unit import base
@ -49,17 +48,14 @@ class TestConfigHelper(base.SaharaTestCase):
init_configs = v_helper._init_all_configs()
self.assertEqual(init_configs, configs)
def test_get_spark_opt_default(self):
opt_name = 'Executor extra classpath'
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.8.2.jar'])
default = v_helper._get_spark_opt_default(opt_name)
self.assertEqual(default, _default_executor_classpath)
def test_get_spark_configs(self):
h_helper.SPARK_CONFS = {
'Spark': {
'OPTIONS': [{
'name': 'test',
'description': 'This is a test',
'default': 'default',
'priority': 1
}]
}
}
spark_configs = v_helper._get_spark_configs()
for i in spark_configs:
self.assertIsInstance(i, p.Config)